/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.wmassigners;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
import org.apache.flink.table.runtime.generated.WatermarkGenerator;
import org.apache.flink.table.runtime.operators.wmassigners.BoundedOutOfOrderWatermarkGenerator;
import org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator;
import org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorFactory;
import org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTestBase;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class WatermarkAssignerOperatorTest
extends WatermarkAssignerOperatorTestBase {
    private static final WatermarkGenerator WATERMARK_GENERATOR = new BoundedOutOfOrderWatermarkGenerator(0, 1L);

    @Test
    public void testCalculateProcessingTimeTimerInterval() {
        Assertions.assertThat((long)WatermarkAssignerOperator.calculateProcessingTimeTimerInterval((long)5L, (long)0L)).isEqualTo(5L);
        Assertions.assertThat((long)WatermarkAssignerOperator.calculateProcessingTimeTimerInterval((long)5L, (long)-1L)).isEqualTo(5L);
        Assertions.assertThat((long)WatermarkAssignerOperator.calculateProcessingTimeTimerInterval((long)0L, (long)5L)).isEqualTo(5L);
        Assertions.assertThat((long)WatermarkAssignerOperator.calculateProcessingTimeTimerInterval((long)-1L, (long)5L)).isEqualTo(5L);
        Assertions.assertThat((long)WatermarkAssignerOperator.calculateProcessingTimeTimerInterval((long)5L, (long)42L)).isEqualTo(5L);
        Assertions.assertThat((long)WatermarkAssignerOperator.calculateProcessingTimeTimerInterval((long)42L, (long)5L)).isEqualTo(5L);
        Assertions.assertThat((long)WatermarkAssignerOperator.calculateProcessingTimeTimerInterval((long)2L, (long)4L)).isEqualTo(1L);
        Assertions.assertThat((long)WatermarkAssignerOperator.calculateProcessingTimeTimerInterval((long)4L, (long)2L)).isEqualTo(1L);
        Assertions.assertThat((long)WatermarkAssignerOperator.calculateProcessingTimeTimerInterval((long)100L, (long)110L)).isEqualTo(20L);
        Assertions.assertThat((long)WatermarkAssignerOperator.calculateProcessingTimeTimerInterval((long)110L, (long)100L)).isEqualTo(20L);
    }

    @Test
    public void testWatermarkAssignerWithIdleSource() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = WatermarkAssignerOperatorTest.createTestHarness(0, WATERMARK_GENERATOR, 1000L);
        testHarness.getExecutionConfig().setAutoWatermarkInterval(50L);
        testHarness.open();
        ConcurrentLinkedQueue output = testHarness.getOutput();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{1L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{2L})));
        testHarness.processWatermark(new Watermark(2L));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{3L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{4L})));
        testHarness.setProcessingTime(51L);
        expectedOutput.add(new Watermark(3L));
        Assertions.assertThat(this.filterOutRecords(output)).isEqualTo(expectedOutput);
        this.stepProcessingTime(testHarness, 52L, 1050L, 50L);
        Assertions.assertThat(this.filterOutRecords(output)).isEqualTo(expectedOutput);
        this.stepProcessingTime(testHarness, 1051L, 1100L, 50L);
        expectedOutput.add(WatermarkStatus.IDLE);
        Assertions.assertThat(this.filterOutRecords(output)).isEqualTo(expectedOutput);
        expectedOutput.add(WatermarkStatus.ACTIVE);
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{4L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{5L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{6L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{7L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{8L})));
        Assertions.assertThat(this.filterOutRecords(output)).isEqualTo(expectedOutput);
        this.stepProcessingTime(testHarness, 1101L, 1200L, 50L);
        expectedOutput.add(new Watermark(7L));
        Assertions.assertThat(this.filterOutRecords(output)).isEqualTo(expectedOutput);
    }

    @Test
    public void testWatermarkIntervalSmallerThanIdleTimeout() throws Exception {
        this.testIdleTimeout(1000L, 50L);
    }

    @Test
    public void testIdleTimeoutSmallerThanWatermarkInterval() throws Exception {
        this.testIdleTimeout(50L, 1000L);
    }

    private void testIdleTimeout(long idleTimeout, long watermarkInterval) throws Exception {
        long step = Math.min(idleTimeout, watermarkInterval);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = WatermarkAssignerOperatorTest.createTestHarness(0, WATERMARK_GENERATOR, idleTimeout);
        testHarness.getExecutionConfig().setAutoWatermarkInterval(watermarkInterval);
        testHarness.open();
        ConcurrentLinkedQueue output = testHarness.getOutput();
        long timeBetweenRecords = (long)((double)idleTimeout * 0.9);
        for (long i = 1L; i <= 10L; ++i) {
            long timestamp = i * timeBetweenRecords;
            testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{timestamp}), timestamp));
            this.stepProcessingTime(testHarness, timestamp, timestamp + timeBetweenRecords - 1L, step);
        }
        Assertions.assertThat(this.extractWatermarkStatuses(output)).doesNotContain((Object[])new WatermarkStatus[]{WatermarkStatus.IDLE});
    }

    private void stepProcessingTime(OneInputStreamOperatorTestHarness<?, ?> testHarness, long fromInclusive, long toInclusive, long step) throws Exception {
        for (long time = fromInclusive; time < toInclusive; time += step) {
            testHarness.setProcessingTime(time);
        }
        testHarness.setProcessingTime(toInclusive);
    }

    @Test
    public void testWatermarkAssignerOperator() throws Exception {
        Tuple2<Long, Long> update;
        Object next;
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = WatermarkAssignerOperatorTest.createTestHarness(0, WATERMARK_GENERATOR, -1L);
        testHarness.getExecutionConfig().setAutoWatermarkInterval(50L);
        long currentTime = 0L;
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{1L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{2L})));
        testHarness.processWatermark(new Watermark(2L));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{3L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{4L})));
        ConcurrentLinkedQueue output = testHarness.getOutput();
        long nextElementValue = 1L;
        long lastWatermark = -1L;
        while (lastWatermark < 3L) {
            if (output.size() > 0) {
                next = output.poll();
                Assertions.assertThat(next).isNotNull();
                update = this.validateElement(next, nextElementValue, lastWatermark);
                nextElementValue = (Long)update.f0;
                lastWatermark = (Long)update.f1;
                Assertions.assertThat((long)lastWatermark).isLessThan(nextElementValue);
                continue;
            }
            testHarness.setProcessingTime(currentTime += 10L);
        }
        output.clear();
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{4L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{5L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{6L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{7L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{8L})));
        output = testHarness.getOutput();
        nextElementValue = 4L;
        lastWatermark = 2L;
        while (lastWatermark < 7L) {
            if (output.size() > 0) {
                next = output.poll();
                Assertions.assertThat(next).isNotNull();
                update = this.validateElement(next, nextElementValue, lastWatermark);
                nextElementValue = (Long)update.f0;
                lastWatermark = (Long)update.f1;
                Assertions.assertThat((long)lastWatermark).isLessThan(nextElementValue);
                continue;
            }
            testHarness.setProcessingTime(currentTime += 10L);
        }
        output.clear();
        testHarness.processWatermark(new Watermark(Long.MAX_VALUE));
        Assertions.assertThat((long)((Watermark)testHarness.getOutput().poll()).getTimestamp()).isEqualTo(Long.MAX_VALUE);
    }

    @Test
    public void testCustomizedWatermarkGenerator() throws Exception {
        MyWatermarkGenerator.openCalled = false;
        MyWatermarkGenerator.closeCalled = false;
        MyWatermarkGenerator generator = new MyWatermarkGenerator(1);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = WatermarkAssignerOperatorTest.createTestHarness(0, generator, -1L);
        testHarness.getExecutionConfig().setAutoWatermarkInterval(5L);
        long currentTime = 0L;
        ArrayList<Watermark> expected = new ArrayList<Watermark>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{1L, 0L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{2L, 1L})));
        testHarness.processWatermark(new Watermark(2L));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{3L, 1L})));
        testHarness.setProcessingTime(currentTime += 5L);
        expected.add(new Watermark(1L));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{4L, 2L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{2L, 1L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{1L, 0L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{6L, null})));
        testHarness.setProcessingTime(currentTime += 5L);
        expected.add(new Watermark(2L));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{9L, 8L})));
        expected.add(new Watermark(8L));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{8L, 7L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{10L, null})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{11L, 10L})));
        testHarness.setProcessingTime(currentTime += 5L);
        expected.add(new Watermark(10L));
        testHarness.close();
        expected.add(Watermark.MAX_WATERMARK);
        List<Watermark> results = this.extractWatermarks(testHarness.getOutput());
        Assertions.assertThat(results).isEqualTo(expected);
        Assertions.assertThat((boolean)MyWatermarkGenerator.openCalled).isTrue();
        Assertions.assertThat((boolean)MyWatermarkGenerator.closeCalled).isTrue();
        Assertions.assertThat((Collection)testHarness.getOutput()).hasSize(expected.size() + 11);
    }

    private static OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(int rowtimeFieldIndex, final WatermarkGenerator watermarkGenerator, long idleTimeout) throws Exception {
        return new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new WatermarkAssignerOperatorFactory(rowtimeFieldIndex, idleTimeout, new GeneratedWatermarkGenerator(watermarkGenerator.getClass().getName(), "", new Object[0]){

            public WatermarkGenerator newInstance(ClassLoader classLoader) {
                return watermarkGenerator;
            }

            public WatermarkGenerator newInstance(ClassLoader classLoader, Object ... args) {
                return watermarkGenerator;
            }
        }));
    }

    private static final class MyWatermarkGenerator
    extends WatermarkGenerator {
        private static final long serialVersionUID = 1L;
        private static boolean openCalled = false;
        private static boolean closeCalled = false;
        private final int watermarkFieldIndex;

        private MyWatermarkGenerator(int watermarkFieldIndex) {
            this.watermarkFieldIndex = watermarkFieldIndex;
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            if (closeCalled) {
                Assertions.fail((String)"Close called before open.");
            }
            openCalled = true;
        }

        @Nullable
        public Long currentWatermark(RowData row) throws Exception {
            if (!openCalled) {
                Assertions.fail((String)"Open was not called before run.");
            }
            if (row.isNullAt(this.watermarkFieldIndex)) {
                return null;
            }
            return row.getLong(this.watermarkFieldIndex);
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assertions.fail((String)"Open was not called before close.");
            }
            closeCalled = true;
        }
    }
}

