/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.deduplicate.window;

import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.deduplicate.window.RowTimeWindowDeduplicateOperatorBuilder;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class RowTimeWindowDeduplicateOperatorTest {
    private static final RowType INPUT_ROW_TYPE = new RowType(Arrays.asList(new RowType.RowField("f0", (LogicalType)new VarCharType(Integer.MAX_VALUE)), new RowType.RowField("f1", (LogicalType)new BigIntType()), new RowType.RowField("f2", (LogicalType)new BigIntType())));
    private static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE);
    private static final RowDataKeySelector KEY_SELECTOR = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, INPUT_ROW_TYPE.getChildren().toArray(new LogicalType[0]));
    private static final PagedTypeSerializer<RowData> KEY_SER = (PagedTypeSerializer)KEY_SELECTOR.getProducedType().toSerializer();
    private static final int WINDOW_END_INDEX = 2;
    private static final LogicalType[] OUTPUT_TYPES = new LogicalType[]{new VarCharType(Integer.MAX_VALUE), new BigIntType(), new BigIntType()};
    private static final TypeSerializer<RowData> OUT_SERIALIZER = new RowDataSerializer(OUTPUT_TYPES);
    private static final RowDataHarnessAssertor ASSERTER = new RowDataHarnessAssertor(OUTPUT_TYPES, new GenericRowRecordSortComparator(0, (LogicalType)VarCharType.STRING_TYPE));
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
    private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
    private final ZoneId shiftTimeZone;

    public RowTimeWindowDeduplicateOperatorTest(ZoneId shiftTimeZone) {
        this.shiftTimeZone = shiftTimeZone;
    }

    @Parameterized.Parameters(name="TimeZone = {0}")
    public static Collection<Object[]> runMode() {
        return Arrays.asList({UTC_ZONE_ID}, {SHANGHAI_ZONE_ID});
    }

    private static OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(WindowAggOperator<RowData, ?> operator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness(operator, (KeySelector)KEY_SELECTOR, (TypeInformation)KEY_SELECTOR.getProducedType());
    }

    @Test
    public void testRowTimeWindowDeduplicateKeepFirstRow() throws Exception {
        WindowAggOperator operator = RowTimeWindowDeduplicateOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).keepLastRow(false).rowtimeIndex(1).windowEndIndex(2).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = RowTimeWindowDeduplicateOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 4L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 5L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1002L, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3007L, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3008L, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3001L, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 3L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 3L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1004L, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1006L, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1007L, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 1L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 1004L, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1002L, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = RowTimeWindowDeduplicateOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3001L, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3001L, TimeWindowUtil.toUtcTimestampMills((long)3500L, (ZoneId)this.shiftTimeZone)));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((long)operator.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        testHarness.close();
    }

    @Test
    public void testRowTimeWindowDeduplicateKeepLastRow() throws Exception {
        WindowAggOperator operator = RowTimeWindowDeduplicateOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).keepLastRow(true).rowtimeIndex(1).windowEndIndex(2).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = RowTimeWindowDeduplicateOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 4L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 5L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1002L, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3007L, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3008L, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3001L, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 3L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 3L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1004L, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1006L, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1007L, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 5L, TimeWindowUtil.toUtcTimestampMills((long)999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 1007L, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1002L, TimeWindowUtil.toUtcTimestampMills((long)1999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = RowTimeWindowDeduplicateOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3008L, TimeWindowUtil.toUtcTimestampMills((long)3999L, (ZoneId)this.shiftTimeZone)));
        expectedOutput.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3001L, TimeWindowUtil.toUtcTimestampMills((long)3500L, (ZoneId)this.shiftTimeZone)));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((long)operator.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        testHarness.close();
    }
}

