/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.api;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.UUID;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;

public class FileReadingWatermarkITCase {
    private static final String NUM_WATERMARKS_ACC_NAME = "numWatermarks";
    private static final int FILE_SIZE_LINES = 100000;
    private static final int WATERMARK_INTERVAL_MILLIS = 10;
    private static final int MIN_EXPECTED_WATERMARKS = 5;

    @Test
    public void testWatermarkEmissionWithChaining() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        env.getConfig().setAutoWatermarkInterval(10L);
        Preconditions.checkState((boolean)env.isChainingEnabled());
        env.readTextFile(this.getSourceFile().getAbsolutePath()).assignTimestampsAndWatermarks(FileReadingWatermarkITCase.getExtractorAssigner()).addSink(FileReadingWatermarkITCase.getWatermarkCounter());
        JobExecutionResult result = env.execute();
        int actual = (Integer)result.getAccumulatorResult(NUM_WATERMARKS_ACC_NAME);
        Assert.assertTrue((String)("too few watermarks emitted: " + actual), (actual >= 5 ? 1 : 0) != 0);
    }

    private File getSourceFile() throws IOException {
        File file = File.createTempFile(UUID.randomUUID().toString(), null);
        try (PrintWriter printWriter = new PrintWriter(file);){
            for (int i = 0; i < 100000; ++i) {
                printWriter.println(i);
            }
        }
        file.deleteOnExit();
        return file;
    }

    private static BoundedOutOfOrdernessTimestampExtractor<String> getExtractorAssigner() {
        return new BoundedOutOfOrdernessTimestampExtractor<String>(Time.hours((long)1L)){
            private final long started = System.currentTimeMillis();

            public long extractTimestamp(String line) {
                return this.started + Long.parseLong(line);
            }
        };
    }

    private static SinkFunction<String> getWatermarkCounter() {
        return new RichSinkFunction<String>(){
            private final IntCounter numWatermarks = new IntCounter();
            private long lastWatermark = -1L;

            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                this.getRuntimeContext().addAccumulator(FileReadingWatermarkITCase.NUM_WATERMARKS_ACC_NAME, (Accumulator)this.numWatermarks);
            }

            public void close() throws Exception {
                super.close();
            }

            public void invoke(String value, SinkFunction.Context context) {
                if (context.currentWatermark() != this.lastWatermark) {
                    this.lastWatermark = context.currentWatermark();
                    this.numWatermarks.add(1);
                }
            }
        };
    }
}

