/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.reader;

import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CyclicBarrier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.testutils.MetricAssertions;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public class SourceMetricsITCase
extends TestLogger {
    private static final int DEFAULT_PARALLELISM = 4;
    private static final long EVENTTIME_LAG = Duration.ofDays(100L).toMillis();
    private static final long WATERMARK_LAG = Duration.ofDays(1L).toMillis();
    private static final long EVENTTIME_EPSILON = Duration.ofDays(20L).toMillis();
    private static final long WATERMARK_EPSILON = Duration.ofHours(6L).toMillis();
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();
    private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).setConfiguration(reporter.addToConfiguration(new Configuration())).build());

    @Test
    public void testMetricsWithTimestamp() throws Exception {
        long baseTime = System.currentTimeMillis() - EVENTTIME_LAG;
        WatermarkStrategy strategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)context -> new EagerBoundedOutOfOrdernessWatermarks()).withTimestampAssigner((SerializableTimestampAssigner)new LaggingTimestampAssigner(baseTime));
        this.testMetrics((WatermarkStrategy<Integer>)strategy, true);
    }

    @Test
    public void testMetricsWithoutTimestamp() throws Exception {
        this.testMetrics((WatermarkStrategy<Integer>)WatermarkStrategy.noWatermarks(), false);
    }

    private void testMetrics(WatermarkStrategy<Integer> strategy, boolean hasTimestamps) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        int numSplits = Math.max(1, env.getParallelism() - 2);
        env.getConfig().setAutoWatermarkInterval(1L);
        int numRecordsPerSplit = 10;
        MockBaseSource source = new MockBaseSource(numSplits, numRecordsPerSplit, Boundedness.BOUNDED);
        SharedReference beforeBarrier = this.sharedObjects.add((Object)new CyclicBarrier(numSplits + 1));
        SharedReference afterBarrier = this.sharedObjects.add((Object)new CyclicBarrier(numSplits + 1));
        int stopAtRecord1 = 3;
        int stopAtRecord2 = numRecordsPerSplit - 1;
        SingleOutputStreamOperator stream = env.fromSource((Source)source, strategy, "MetricTestingSource").map((MapFunction & Serializable)i -> {
            if (i % numRecordsPerSplit == stopAtRecord1 || i % numRecordsPerSplit == stopAtRecord2) {
                ((CyclicBarrier)beforeBarrier.get()).await();
                ((CyclicBarrier)afterBarrier.get()).await();
            }
            return i;
        });
        stream.addSink((SinkFunction)new DiscardingSink());
        JobClient jobClient = env.executeAsync();
        JobID jobId = jobClient.getJobID();
        ((CyclicBarrier)beforeBarrier.get()).await();
        this.assertSourceMetrics(jobId, reporter, stopAtRecord1 + 1, numRecordsPerSplit, env.getParallelism(), numSplits, hasTimestamps);
        ((CyclicBarrier)afterBarrier.get()).await();
        ((CyclicBarrier)beforeBarrier.get()).await();
        this.assertSourceMetrics(jobId, reporter, stopAtRecord2 + 1, numRecordsPerSplit, env.getParallelism(), numSplits, hasTimestamps);
        ((CyclicBarrier)afterBarrier.get()).await();
        jobClient.getJobExecutionResult().get();
    }

    private void assertSourceMetrics(JobID jobId, InMemoryReporter reporter, long processedRecordsPerSubtask, long numTotalPerSubtask, int parallelism, int numSplits, boolean hasTimestamps) {
        List groups = reporter.findOperatorMetricGroups(jobId, "MetricTestingSource");
        Assertions.assertThat((List)groups).hasSize(parallelism);
        int subtaskWithMetrics = 0;
        for (OperatorMetricGroup group : groups) {
            Map metrics = reporter.getMetricsByGroup((MetricGroup)group);
            if (group.getIOMetricGroup().getNumRecordsInCounter().getCount() == 0L) {
                MetricAssertions.assertThatGauge((Metric)((Metric)metrics.get("currentEmitEventTimeLag"))).isEqualTo((Object)-1L);
                Assertions.assertThat(metrics.get("watermarkLag")).isNull();
                continue;
            }
            ++subtaskWithMetrics;
            MetricAssertions.assertThatCounter((Metric)group.getIOMetricGroup().getNumRecordsInCounter()).isEqualTo((Object)processedRecordsPerSubtask);
            MetricAssertions.assertThatCounter((Metric)group.getIOMetricGroup().getNumBytesInCounter()).isEqualTo((Object)(processedRecordsPerSubtask * 10L));
            MetricAssertions.assertThatCounter((Metric)((Metric)metrics.get("numRecordsInErrors"))).isEqualTo((Object)(processedRecordsPerSubtask / 2L));
            if (hasTimestamps) {
                MetricAssertions.assertThatGauge((Metric)((Metric)metrics.get("currentEmitEventTimeLag"))).isCloseTo(EVENTTIME_LAG, EVENTTIME_EPSILON);
                MetricAssertions.assertThatGauge((Metric)((Metric)metrics.get("watermarkLag"))).isCloseTo(EVENTTIME_LAG, EVENTTIME_EPSILON);
                Long watermarkLag = (Long)((Gauge)metrics.get("watermarkLag")).getValue() - (Long)((Gauge)metrics.get("currentEmitEventTimeLag")).getValue();
                Assertions.assertThat((Long)watermarkLag).isGreaterThan(WATERMARK_LAG - WATERMARK_EPSILON).isLessThan(WATERMARK_LAG + WATERMARK_EPSILON);
            } else {
                MetricAssertions.assertThatGauge((Metric)((Metric)metrics.get("currentEmitEventTimeLag"))).isEqualTo((Object)-1L);
                Assertions.assertThat(metrics.get("watermarkLag")).isNull();
            }
            long pendingRecords = numTotalPerSubtask - processedRecordsPerSubtask;
            MetricAssertions.assertThatGauge((Metric)((Metric)metrics.get("pendingRecords"))).isEqualTo((Object)pendingRecords);
            MetricAssertions.assertThatGauge((Metric)((Metric)metrics.get("pendingBytes"))).isEqualTo((Object)(pendingRecords * 10L));
            MetricAssertions.assertThatGauge((Metric)((Metric)metrics.get("sourceIdleTime"))).isEqualTo((Object)0L);
        }
        Assertions.assertThat((int)subtaskWithMetrics).isEqualTo(numSplits);
    }

    private static class EagerBoundedOutOfOrdernessWatermarks
    extends BoundedOutOfOrdernessWatermarks<Integer> {
        public EagerBoundedOutOfOrdernessWatermarks() {
            super(Duration.ofMillis(WATERMARK_LAG));
        }

        public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) {
            super.onEvent((Object)event, eventTimestamp, output);
            this.onPeriodicEmit(output);
        }
    }

    private static class LaggingTimestampAssigner
    implements SerializableTimestampAssigner<Integer> {
        private final long baseTime;

        public LaggingTimestampAssigner(long baseTime) {
            this.baseTime = baseTime;
        }

        public long extractTimestamp(Integer i, long ts) {
            return this.baseTime + (long)i.intValue();
        }
    }
}

