/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.window.tvf.operator;

import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.CumulativeWindowAssigner;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SlidingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.TumblingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.operator.AlignedWindowTableFunctionOperator;
import org.apache.flink.table.runtime.operators.window.tvf.operator.WindowTableFunctionOperatorTestBase;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class AlignedWindowTableFunctionOperatorTest
extends WindowTableFunctionOperatorTestBase {
    public AlignedWindowTableFunctionOperatorTest(ZoneId shiftTimeZone) {
        super(shiftTimeZone);
    }

    @Parameterized.Parameters(name="TimeZone = {0}")
    public static Collection<Object[]> runMode() {
        return Arrays.asList({UTC_ZONE_ID}, {SHANGHAI_ZONE_ID});
    }

    @Test
    public void testTumblingWindows() throws Exception {
        TumblingWindowAssigner assigner = TumblingWindowAssigner.of((Duration)Duration.ofSeconds(3L));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((GroupWindowAssigner<TimeWindow>)assigner, this.shiftTimeZone);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 20L));
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 3999L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 20L, this.localMills(0L), this.localMills(3000L), 2999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 3999L, this.localMills(3000L), this.localMills(6000L), 5999L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 80L));
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, new Long[]{null}));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 80L, this.localMills(0L), this.localMills(3000L), 2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((long)((AlignedWindowTableFunctionOperator)testHarness.getOperator()).getNumNullRowTimeRecordsDropped().getCount()).isEqualTo(1L);
        testHarness.close();
    }

    @Test
    public void testProcessingTimeTumblingWindows() throws Exception {
        TumblingWindowAssigner assigner = TumblingWindowAssigner.of((Duration)Duration.ofSeconds(3L)).withProcessingTime();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((GroupWindowAssigner<TimeWindow>)assigner, this.shiftTimeZone);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.setProcessingTime(20L);
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(3999L);
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, Long.MAX_VALUE));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, Long.MAX_VALUE, this.localMills(0L), this.localMills(3000L), 2999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, Long.MAX_VALUE, this.localMills(3000L), this.localMills(6000L), 5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testHopWindows() throws Exception {
        SlidingWindowAssigner assigner = SlidingWindowAssigner.of((Duration)Duration.ofSeconds(3L), (Duration)Duration.ofSeconds(1L));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((GroupWindowAssigner<TimeWindow>)assigner, this.shiftTimeZone);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 20L));
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 3999L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 20L, this.localMills(-2000L), this.localMills(1000L), 999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 20L, this.localMills(-1000L), this.localMills(2000L), 1999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 20L, this.localMills(0L), this.localMills(3000L), 2999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 3999L, this.localMills(1000L), this.localMills(4000L), 3999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 3999L, this.localMills(2000L), this.localMills(5000L), 4999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 3999L, this.localMills(3000L), this.localMills(6000L), 5999L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 80L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 80L, this.localMills(-2000L), this.localMills(1000L), 999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 80L, this.localMills(-1000L), this.localMills(2000L), 1999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 80L, this.localMills(0L), this.localMills(3000L), 2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testProcessingTimeHopWindows() throws Exception {
        SlidingWindowAssigner assigner = SlidingWindowAssigner.of((Duration)Duration.ofSeconds(3L), (Duration)Duration.ofSeconds(1L)).withProcessingTime();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((GroupWindowAssigner<TimeWindow>)assigner, this.shiftTimeZone);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.setProcessingTime(20L);
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(3999L);
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, Long.MAX_VALUE));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, Long.MAX_VALUE, this.localMills(-2000L), this.localMills(1000L), 999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, Long.MAX_VALUE, this.localMills(-1000L), this.localMills(2000L), 1999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, Long.MAX_VALUE, this.localMills(0L), this.localMills(3000L), 2999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, Long.MAX_VALUE, this.localMills(1000L), this.localMills(4000L), 3999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, Long.MAX_VALUE, this.localMills(2000L), this.localMills(5000L), 4999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, Long.MAX_VALUE, this.localMills(3000L), this.localMills(6000L), 5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testCumulativeWindows() throws Exception {
        CumulativeWindowAssigner assigner = CumulativeWindowAssigner.of((Duration)Duration.ofSeconds(3L), (Duration)Duration.ofSeconds(1L));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((GroupWindowAssigner<TimeWindow>)assigner, this.shiftTimeZone);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 20L));
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 3999L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 20L, this.localMills(0L), this.localMills(1000L), 999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 20L, this.localMills(0L), this.localMills(2000L), 1999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 20L, this.localMills(0L), this.localMills(3000L), 2999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 3999L, this.localMills(3000L), this.localMills(4000L), 3999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 3999L, this.localMills(3000L), this.localMills(5000L), 4999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 3999L, this.localMills(3000L), this.localMills(6000L), 5999L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 80L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 80L, this.localMills(0L), this.localMills(1000L), 999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 80L, this.localMills(0L), this.localMills(2000L), 1999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 80L, this.localMills(0L), this.localMills(3000L), 2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testProcessingCumulativeWindows() throws Exception {
        CumulativeWindowAssigner assigner = CumulativeWindowAssigner.of((Duration)Duration.ofSeconds(3L), (Duration)Duration.ofSeconds(1L)).withProcessingTime();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((GroupWindowAssigner<TimeWindow>)assigner, this.shiftTimeZone);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.setProcessingTime(20L);
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(3999L);
        testHarness.processElement(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, Long.MAX_VALUE));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, Long.MAX_VALUE, this.localMills(0L), this.localMills(1000L), 999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, Long.MAX_VALUE, this.localMills(0L), this.localMills(2000L), 1999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, Long.MAX_VALUE, this.localMills(0L), this.localMills(3000L), 2999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, Long.MAX_VALUE, this.localMills(3000L), this.localMills(4000L), 3999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, Long.MAX_VALUE, this.localMills(3000L), this.localMills(5000L), 4999L));
        expectedOutput.add(AlignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, Long.MAX_VALUE, this.localMills(3000L), this.localMills(6000L), 5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testConsumingChangelogRecords() throws Exception {
        TumblingWindowAssigner assigner = TumblingWindowAssigner.of((Duration)Duration.ofSeconds(3L));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((GroupWindowAssigner<TimeWindow>)assigner, this.shiftTimeZone);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 1, 20L));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.UPDATE_BEFORE, "key1", 1, 30L));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.UPDATE_AFTER, "key1", 1, 40L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 1, 20L, this.localMills(0L), this.localMills(3000L), 2999L));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.UPDATE_BEFORE, "key1", 1, 30L, this.localMills(0L), this.localMills(3000L), 2999L));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.UPDATE_AFTER, "key1", 1, 40L, this.localMills(0L), this.localMills(3000L), 2999L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(9999L));
        expectedOutput.add(new Watermark(9999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key1", 1, 200L));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key1", 1, 200L, this.localMills(0L), this.localMills(3000L), 2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(GroupWindowAssigner<TimeWindow> windowAssigner, ZoneId shiftTimeZone) throws Exception {
        AlignedWindowTableFunctionOperator operator = new AlignedWindowTableFunctionOperator(windowAssigner, 2, shiftTimeZone);
        return new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (TypeSerializer)INPUT_ROW_SER);
    }
}

