/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.interval;

import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.KeyedCoProcessOperatorWithWatermarkDelay;
import org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin;
import org.apache.flink.table.runtime.operators.join.interval.TimeIntervalStreamJoinTestBase;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class RowTimeIntervalJoinTest
extends TimeIntervalStreamJoinTestBase {
    private int keyIdx = 1;
    private RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{this.keyIdx}, this.rowType.toRowFieldTypes());
    private TypeInformation<RowData> keyType = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[0]);

    RowTimeIntervalJoinTest() {
    }

    @Test
    void testRowTimeInnerJoinWithCommonBounds() throws Exception {
        RowTimeIntervalJoin joinProcessFunc = new RowTimeIntervalJoin(FlinkJoinType.INNER, -10L, 20L, 0L, 15L, this.rowType, this.rowType, this.joinFunction, 0, 0);
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(joinProcessFunc);
        testHarness.open();
        testHarness.processWatermark1(new Watermark(1L));
        testHarness.processWatermark2(new Watermark(1L));
        testHarness.processElement1(StreamRecordUtils.insertRecord(1L, "k1"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(1);
        testHarness.processElement1(StreamRecordUtils.insertRecord(2L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(2L, "k1"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(2);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(4);
        testHarness.processElement1(StreamRecordUtils.insertRecord(5L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(15L, "k1"));
        testHarness.processWatermark1(new Watermark(20L));
        testHarness.processWatermark2(new Watermark(20L));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(4);
        testHarness.processElement1(StreamRecordUtils.insertRecord(35L, "k1"));
        testHarness.processWatermark1(new Watermark(38L));
        testHarness.processWatermark2(new Watermark(38L));
        testHarness.processElement1(StreamRecordUtils.insertRecord(40L, "k2"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(39L, "k2"));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(6);
        testHarness.processWatermark1(new Watermark(61L));
        testHarness.processWatermark2(new Watermark(61L));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(4);
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(new Watermark(-19L));
        expectedOutput.add(StreamRecordUtils.insertRecord(1L, "k1", 2L, "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2L, "k1", 2L, "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(5L, "k1", 2L, "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(5L, "k1", 15L, "k1"));
        expectedOutput.add(new Watermark(0L));
        expectedOutput.add(StreamRecordUtils.insertRecord(35L, "k1", 15L, "k1"));
        expectedOutput.add(new Watermark(18L));
        expectedOutput.add(StreamRecordUtils.insertRecord(40L, "k2", 39L, "k2"));
        expectedOutput.add(new Watermark(41L));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testRowTimeInnerJoinWithNegativeBounds() throws Exception {
        RowTimeIntervalJoin joinProcessFunc = new RowTimeIntervalJoin(FlinkJoinType.INNER, -10L, -7L, 0L, 0L, this.rowType, this.rowType, this.joinFunction, 0, 0);
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(joinProcessFunc);
        testHarness.open();
        testHarness.processWatermark1(new Watermark(1L));
        testHarness.processWatermark2(new Watermark(1L));
        testHarness.processElement2(StreamRecordUtils.insertRecord(2L, "k1"));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(0);
        testHarness.processWatermark1(new Watermark(2L));
        testHarness.processWatermark2(new Watermark(2L));
        testHarness.processElement1(StreamRecordUtils.insertRecord(3L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(3L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(13L, "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(6L, "k1"));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(4);
        testHarness.processWatermark1(new Watermark(10L));
        testHarness.processWatermark2(new Watermark(10L));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(2);
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(0);
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(new Watermark(-9L));
        expectedOutput.add(new Watermark(-8L));
        expectedOutput.add(StreamRecordUtils.insertRecord(3L, "k1", 13L, "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(6L, "k1", 13L, "k1"));
        expectedOutput.add(new Watermark(0L));
        expectedOutput.add(new Watermark(8L));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testRowTimeInnerJoinRealtimeCleanUp() throws Exception {
        RowTimeIntervalJoin joinProcessFunc = new RowTimeIntervalJoin(FlinkJoinType.LEFT, -5L, 9L, 0L, 0L, this.rowType, this.rowType, this.joinFunction, 0, 0);
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(joinProcessFunc);
        testHarness.open();
        testHarness.processElement1(StreamRecordUtils.insertRecord(1L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(1L, "k2"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(2);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(4);
        testHarness.processWatermark1(new Watermark(7L));
        testHarness.processWatermark2(new Watermark(7L));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(1);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(2);
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1L, "k1", null, null));
        expectedOutput.add(new Watermark(-2L));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testRowTimeLeftOuterJoin() throws Exception {
        RowTimeIntervalJoin joinProcessFunc = new RowTimeIntervalJoin(FlinkJoinType.LEFT, -5L, 9L, 0L, 7L, this.rowType, this.rowType, this.joinFunction, 0, 0);
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(joinProcessFunc);
        testHarness.open();
        testHarness.processElement1(StreamRecordUtils.insertRecord(1L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(1L, "k2"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(2);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(4);
        testHarness.processWatermark1(new Watermark(14L));
        testHarness.processWatermark2(new Watermark(14L));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(1);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(2);
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(2L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(2L, "k2"));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(0);
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(19L, "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(20L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(26L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(25L, "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(21L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(39L, "k2"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(40L, "k2"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(50L, "k2"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(49L, "k2"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(41L, "k2"));
        testHarness.processWatermark1(new Watermark(100L));
        testHarness.processWatermark2(new Watermark(100L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1L, "k1", null, null));
        expectedOutput.add(new Watermark(5L));
        expectedOutput.add(new Watermark(9L));
        expectedOutput.add(StreamRecordUtils.insertRecord(2L, "k1", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(20L, "k1", 25L, "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(21L, "k1", 25L, "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(21L, "k1", 26L, "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(49L, "k2", 40L, "k2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(49L, "k2", 41L, "k2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(50L, "k2", 41L, "k2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(19L, "k1", null, null));
        expectedOutput.add(new Watermark(91L));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testRowTimeRightOuterJoin() throws Exception {
        RowTimeIntervalJoin joinProcessFunc = new RowTimeIntervalJoin(FlinkJoinType.RIGHT, -5L, 9L, 0L, 7L, this.rowType, this.rowType, this.joinFunction, 0, 0);
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(joinProcessFunc);
        testHarness.open();
        testHarness.processElement1(StreamRecordUtils.insertRecord(1L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(1L, "k2"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(2);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(4);
        testHarness.processWatermark1(new Watermark(14L));
        testHarness.processWatermark2(new Watermark(14L));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(1);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(2);
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(2L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(2L, "k2"));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(0);
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(19L, "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(20L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(26L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(25L, "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(21L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(39L, "k2"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(40L, "k2"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(50L, "k2"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(49L, "k2"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(41L, "k2"));
        testHarness.processWatermark1(new Watermark(100L));
        testHarness.processWatermark2(new Watermark(100L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(new Watermark(5L));
        expectedOutput.add(StreamRecordUtils.insertRecord(null, null, 1L, "k2"));
        expectedOutput.add(new Watermark(9L));
        expectedOutput.add(StreamRecordUtils.insertRecord(null, null, 2L, "k2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(20L, "k1", 25L, "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(21L, "k1", 25L, "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(21L, "k1", 26L, "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(49L, "k2", 40L, "k2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(49L, "k2", 41L, "k2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(50L, "k2", 41L, "k2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(null, null, 39L, "k2"));
        expectedOutput.add(new Watermark(91L));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testRowTimeFullOuterJoin() throws Exception {
        RowTimeIntervalJoin joinProcessFunc = new RowTimeIntervalJoin(FlinkJoinType.FULL, -5L, 9L, 0L, 7L, this.rowType, this.rowType, this.joinFunction, 0, 0);
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(joinProcessFunc);
        testHarness.open();
        testHarness.processElement1(StreamRecordUtils.insertRecord(1L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(1L, "k2"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(2);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(4);
        testHarness.processWatermark1(new Watermark(14L));
        testHarness.processWatermark2(new Watermark(14L));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(1);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(2);
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(2L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(2L, "k2"));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(0);
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(19L, "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(20L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(26L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(25L, "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(21L, "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(39L, "k2"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(40L, "k2"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(50L, "k2"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(49L, "k2"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(41L, "k2"));
        testHarness.processWatermark1(new Watermark(100L));
        testHarness.processWatermark2(new Watermark(100L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1L, "k1", null, null));
        expectedOutput.add(new Watermark(5L));
        expectedOutput.add(StreamRecordUtils.insertRecord(null, null, 1L, "k2"));
        expectedOutput.add(new Watermark(9L));
        expectedOutput.add(StreamRecordUtils.insertRecord(2L, "k1", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(null, null, 2L, "k2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(20L, "k1", 25L, "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(21L, "k1", 25L, "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(21L, "k1", 26L, "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(49L, "k2", 40L, "k2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(49L, "k2", 41L, "k2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(50L, "k2", 41L, "k2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(19L, "k1", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(null, null, 39L, "k2"));
        expectedOutput.add(new Watermark(91L));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness(RowTimeIntervalJoin intervalJoinFunc) throws Exception {
        KeyedCoProcessOperatorWithWatermarkDelay operator = new KeyedCoProcessOperatorWithWatermarkDelay((KeyedCoProcessFunction)intervalJoinFunc, intervalJoinFunc.getMaxOutputDelay());
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, (KeySelector)this.keySelector, (KeySelector)this.keySelector, this.keyType);
        return testHarness;
    }
}

