/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate.window;

import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
import org.apache.flink.table.runtime.operators.aggregate.window.WindowAggOperatorBuilder;
import org.apache.flink.table.runtime.operators.aggregate.window.WindowAggOperatorTestBase;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigners;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class UnslicingWindowAggOperatorTest
extends WindowAggOperatorTestBase {
    public UnslicingWindowAggOperatorTest(ZoneId shiftTimeZone) {
        super(shiftTimeZone);
    }

    @Test
    public void testEventTimeSessionWindows() throws Exception {
        UnsliceAssigners.SessionUnsliceAssigner assigner = UnsliceAssigners.session((int)2, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofSeconds(3L));
        UnslicingSumAndCountAggsFunction aggsFunction = new UnslicingSumAndCountAggsFunction();
        WindowAggOperator operator = WindowAggOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner((WindowAssigner)assigner).aggregate(UnslicingWindowAggOperatorTest.createGeneratedAggsHandle(aggsFunction), (AbstractRowDataSerializer)ACC_SER).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = UnslicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)20L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)0L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1998L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1000L)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
        expectedOutput.clear();
        testHarness = UnslicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new Watermark(2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(3999L)));
        expectedOutput.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3500L)));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)999L)));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(new Watermark(5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 6L, 6L, this.localMills(1000L), this.localMills(6999L)));
        expectedOutput.add(new Watermark(6999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(7999L));
        testHarness.processWatermark(new Watermark(8999L));
        expectedOutput.add(new Watermark(7999L));
        expectedOutput.add(new Watermark(8999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((long)operator.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        testHarness.close();
    }

    @Test
    public void testEventTimeSessionWindowsWithChangelog() throws Exception {
        UnsliceAssigners.SessionUnsliceAssigner assigner = UnsliceAssigners.session((int)2, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofSeconds(3L));
        UnslicingSumAndCountAggsFunction aggsFunction = new UnslicingSumAndCountAggsFunction();
        WindowAggOperator operator = WindowAggOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner((WindowAssigner)assigner).aggregate(UnslicingWindowAggOperatorTest.createGeneratedAggsHandle(aggsFunction), (AbstractRowDataSerializer)ACC_SER).countStarIndex(1).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = UnslicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key2", 1, TimestampData.fromEpochMillis((long)3999L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key2", 1, TimestampData.fromEpochMillis((long)3000L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 1, TimestampData.fromEpochMillis((long)20L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 1, TimestampData.fromEpochMillis((long)0L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 1, TimestampData.fromEpochMillis((long)999L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key2", 1, TimestampData.fromEpochMillis((long)1998L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key2", 1, TimestampData.fromEpochMillis((long)1999L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key2", 1, TimestampData.fromEpochMillis((long)1000L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key3", 1, TimestampData.fromEpochMillis((long)1999L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key4", 1, TimestampData.fromEpochMillis((long)999L)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key3", 1, TimestampData.fromEpochMillis((long)2900L)));
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
        expectedOutput.clear();
        testHarness = UnslicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new Watermark(2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key2", 1, TimestampData.fromEpochMillis((long)3999L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key2", 1, TimestampData.fromEpochMillis((long)3000L)));
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 3L, 3L, this.localMills(0L), this.localMills(3999L)));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key4", null, -1L, this.localMills(999L), this.localMills(3999L)));
        expectedOutput.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key2", 1, TimestampData.fromEpochMillis((long)3500L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.UPDATE_BEFORE, "key2", 1, TimestampData.fromEpochMillis((long)3000L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.UPDATE_AFTER, "key2", 1, TimestampData.fromEpochMillis((long)3800L)));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 1, TimestampData.fromEpochMillis((long)999L)));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(new Watermark(5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key2", 4L, 4L, this.localMills(1000L), this.localMills(6999L)));
        expectedOutput.add(new Watermark(6999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(7999L));
        testHarness.processWatermark(new Watermark(8999L));
        expectedOutput.add(new Watermark(7999L));
        expectedOutput.add(new Watermark(8999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((long)operator.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        testHarness.close();
    }

    @Test
    public void testProcessingTimeSessionWindows() throws Exception {
        UnsliceAssigners.SessionUnsliceAssigner assigner = UnsliceAssigners.session((int)-1, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofSeconds(3L));
        UnslicingSumAndCountAggsFunction aggsFunction = new UnslicingSumAndCountAggsFunction();
        WindowAggOperator operator = WindowAggOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner((WindowAssigner)assigner).aggregate(UnslicingWindowAggOperatorTest.createGeneratedAggsHandle(aggsFunction), (AbstractRowDataSerializer)ACC_SER).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = UnslicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.setProcessingTime(UnslicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:03"));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
        testHarness = UnslicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.setProcessingTime(UnslicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:06"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, UnslicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:03"), UnslicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:06")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, UnslicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:03"), UnslicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:06")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(UnslicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:07"));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.setProcessingTime(UnslicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:08"));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(UnslicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:12"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, UnslicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:07"), UnslicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:11")));
        Assertions.assertThat((Long)((Long)operator.getWatermarkLatency().getValue())).isEqualTo((Object)0L);
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testProcessingTimeSessionWindowsWithChangelog() throws Exception {
        UnsliceAssigners.SessionUnsliceAssigner assigner = UnsliceAssigners.session((int)-1, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofSeconds(3L));
        UnslicingSumAndCountAggsFunction aggsFunction = new UnslicingSumAndCountAggsFunction();
        WindowAggOperator operator = WindowAggOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner((WindowAssigner)assigner).aggregate(UnslicingWindowAggOperatorTest.createGeneratedAggsHandle(aggsFunction), (AbstractRowDataSerializer)ACC_SER).countStarIndex(1).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = UnslicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.setProcessingTime(UnslicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:03"));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key2", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key2", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key3", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key3", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key4", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
        testHarness = UnslicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.setProcessingTime(UnslicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:06"));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key2", 3L, 3L, UnslicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:03"), UnslicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:06")));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 2L, 2L, UnslicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:03"), UnslicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:06")));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key4", null, -1L, UnslicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:03"), UnslicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:06")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(UnslicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:07"));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.setProcessingTime(UnslicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:08"));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 1, TimestampData.fromEpochMillis((long)8000L)));
        testHarness.setProcessingTime(UnslicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:10"));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.UPDATE_BEFORE, "key1", 1, TimestampData.fromEpochMillis((long)8000L)));
        testHarness.setProcessingTime(UnslicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:11"));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.UPDATE_AFTER, "key1", 1, TimestampData.fromEpochMillis((long)6000L)));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(UnslicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:12"));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(UnslicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:13"));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(UnslicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:14"));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 2L, 2L, UnslicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:07"), UnslicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:14")));
        Assertions.assertThat((Long)((Long)operator.getWatermarkLatency().getValue())).isEqualTo((Object)0L);
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testSessionWindowsWithoutPartitionKey() throws Exception {
        LogicalType[] outputTypes = new LogicalType[]{new BigIntType(), new BigIntType(), new BigIntType(), new BigIntType()};
        RowDataHarnessAssertor asserter = new RowDataHarnessAssertor(outputTypes, new GenericRowRecordSortComparator(0, (LogicalType)VarCharType.STRING_TYPE));
        UnsliceAssigners.SessionUnsliceAssigner assigner = UnsliceAssigners.session((int)2, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofSeconds(3L));
        EmptyRowDataKeySelector keySelector = EmptyRowDataKeySelector.INSTANCE;
        UnslicingSumAndCountAggsFunction aggsFunction = new UnslicingSumAndCountAggsFunction();
        WindowAggOperator operator = WindowAggOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer((PagedTypeSerializer)keySelector.getProducedType().toSerializer()).assigner((WindowAssigner)assigner).aggregate(UnslicingWindowAggOperatorTest.createGeneratedAggsHandle(aggsFunction), (AbstractRowDataSerializer)ACC_SER).build();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)keySelector, (TypeInformation)keySelector.getProducedType());
        testHarness.setup((TypeSerializer)new RowDataSerializer(outputTypes));
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)20L)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        asserter.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key3", 1, TimestampData.fromEpochMillis((long)2990L)));
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        asserter.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
        expectedOutput.clear();
        testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)keySelector, (TypeInformation)keySelector.getProducedType());
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(6999L));
        expectedOutput.add(StreamRecordUtils.insertRecord(3L, 3L, this.localMills(20L), this.localMills(6999L)));
        expectedOutput.add(new Watermark(6999L));
        asserter.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Parameterized.Parameters(name="TimeZone = {0}")
    public static Collection<Object[]> runMode() {
        return Arrays.asList({UTC_ZONE_ID}, {SHANGHAI_ZONE_ID});
    }

    protected static class UnslicingSumAndCountAggsFunction
    extends WindowAggOperatorTestBase.SumAndCountAggsFunctionBase<TimeWindow> {
        protected UnslicingSumAndCountAggsFunction() {
        }

        @Override
        protected long getWindowStart(TimeWindow window) {
            return window.getStart();
        }

        @Override
        protected long getWindowEnd(TimeWindow window) {
            return window.getEnd();
        }
    }
}

