/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.rank.window;

import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.rank.window.WindowRankOperatorBuilder;
import org.apache.flink.table.runtime.operators.sort.IntRecordComparator;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class WindowRankOperatorTest {
    private static final RowType INPUT_ROW_TYPE = new RowType(Arrays.asList(new RowType.RowField("f0", (LogicalType)new VarCharType(Integer.MAX_VALUE)), new RowType.RowField("f1", (LogicalType)new IntType()), new RowType.RowField("f2", (LogicalType)new BigIntType())));
    private static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE);
    private static final RowDataKeySelector KEY_SELECTOR = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, INPUT_ROW_TYPE.getChildren().toArray(new LogicalType[0]));
    private static final PagedTypeSerializer<RowData> KEY_SER = (PagedTypeSerializer)KEY_SELECTOR.getProducedType().toSerializer();
    private static final GeneratedRecordComparator GENERATED_SORT_KEY_COMPARATOR = new GeneratedRecordComparator("", "", new Object[0]){
        private static final long serialVersionUID = 1L;

        public RecordComparator newInstance(ClassLoader classLoader) {
            return IntRecordComparator.INSTANCE;
        }
    };
    private static final int SORT_KEY_IDX = 1;
    private static final RowDataKeySelector SORT_KEY_SELECTOR = HandwrittenSelectorUtil.getRowDataSelector(new int[]{1}, INPUT_ROW_TYPE.getChildren().toArray(new LogicalType[0]));
    private static final int WINDOW_END_INDEX = 2;
    private static final LogicalType[] OUTPUT_TYPES = new LogicalType[]{new VarCharType(Integer.MAX_VALUE), new IntType(), new BigIntType(), new BigIntType()};
    private static final TypeSerializer<RowData> OUT_SERIALIZER = new RowDataSerializer(OUTPUT_TYPES);
    private static final RowDataHarnessAssertor ASSERTER = new RowDataHarnessAssertor(OUTPUT_TYPES, new GenericRowRecordSortComparator(0, (LogicalType)VarCharType.STRING_TYPE));
    private static final LogicalType[] OUTPUT_TYPES_WITHOUT_RANK_NUMBER = new LogicalType[]{new VarCharType(Integer.MAX_VALUE), new IntType(), new BigIntType()};
    private static final TypeSerializer<RowData> OUT_SERIALIZER_WITHOUT_RANK_NUMBER = new RowDataSerializer(OUTPUT_TYPES_WITHOUT_RANK_NUMBER);
    private static final RowDataHarnessAssertor ASSERTER_WITHOUT_RANK_NUMBER = new RowDataHarnessAssertor(OUTPUT_TYPES_WITHOUT_RANK_NUMBER, new GenericRowRecordSortComparator(0, (LogicalType)VarCharType.STRING_TYPE));
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
    private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
    private final ZoneId shiftTimeZone;

    public WindowRankOperatorTest(ZoneId shiftTimeZone) {
        this.shiftTimeZone = shiftTimeZone;
    }

    @Parameterized.Parameters(name="TimeZone = {0}")
    public static Collection<Object[]> runMode() {
        return Arrays.asList({UTC_ZONE_ID}, {SHANGHAI_ZONE_ID});
    }

    private static OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(WindowAggOperator<RowData, ?> operator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness(operator, (KeySelector)KEY_SELECTOR, (TypeInformation)KEY_SELECTOR.getProducedType());
    }

    @Test
    public void testTop2Windows() throws Exception {
        WindowAggOperator operator = WindowRankOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).sortKeyComparator(GENERATED_SORT_KEY_COMPARATOR).sortKeySelector(SORT_KEY_SELECTOR).outputRankNumber(true).rankStart(1L).rankEnd(2L).windowEndIndex(2).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = WindowRankOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 4, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 5, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 2, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 7, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 8, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 3, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 3, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 4, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 6, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 7, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 1, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone), 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone), 2L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone), 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone), 2L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 4, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone), 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 6, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone), 2L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 2, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone), 1L));
        expectedOutput.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = WindowRankOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone), 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 7, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone), 2L));
        expectedOutput.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimeWindowUtil.toUtcTimestampMills((long)3500L, (ZoneId)this.shiftTimeZone)));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((long)operator.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        testHarness.close();
    }

    @Test
    public void testTop2WindowsWithOffset() throws Exception {
        WindowAggOperator operator = WindowRankOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).sortKeyComparator(GENERATED_SORT_KEY_COMPARATOR).sortKeySelector(SORT_KEY_SELECTOR).outputRankNumber(true).rankStart(2L).rankEnd(2L).windowEndIndex(2).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = WindowRankOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 4, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 5, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 2, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 7, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 8, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 3, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 3, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 4, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 6, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 7, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone), 2L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone), 2L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 6, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone), 2L));
        expectedOutput.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = WindowRankOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 7, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone), 2L));
        expectedOutput.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testTop2WindowsWithoutRankNumber() throws Exception {
        WindowAggOperator operator = WindowRankOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).sortKeyComparator(GENERATED_SORT_KEY_COMPARATOR).sortKeySelector(SORT_KEY_SELECTOR).outputRankNumber(false).rankStart(1L).rankEnd(2L).windowEndIndex(2).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = WindowRankOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER_WITHOUT_RANK_NUMBER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 4, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 5, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 2, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 7, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 8, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 3, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 3, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 4, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 6, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 7, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 1, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(new Watermark(999L));
        ASSERTER_WITHOUT_RANK_NUMBER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 4, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 6, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 2, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(new Watermark(1999L));
        ASSERTER_WITHOUT_RANK_NUMBER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = WindowRankOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER_WITHOUT_RANK_NUMBER);
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 7, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(new Watermark(3999L));
        ASSERTER_WITHOUT_RANK_NUMBER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }
}

