/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.stream;

import java.util.function.Predicate;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperatorTestBase;
import org.apache.flink.table.runtime.operators.join.stream.StreamingSemiAntiJoinOperator;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

public class StreamingSemiAntiJoinOperatorTest
extends StreamingJoinOperatorTestBase {
    private static final Predicate<String> ANTI_JOIN_CHECKER = testDisplayName -> testDisplayName.contains("Anti");

    protected StreamingSemiAntiJoinOperator createJoinOperator(TestInfo testInfo) {
        Long[] ttl = (Long[])STATE_RETENTION_TIME_EXTRACTOR.apply(testInfo.getTags());
        return new StreamingSemiAntiJoinOperator(ANTI_JOIN_CHECKER.test(testInfo.getDisplayName()), this.leftTypeInfo, this.rightTypeInfo, this.joinCondition, this.leftInputSpec, this.rightInputSpec, new boolean[]{true}, ttl[0].longValue(), ttl[1].longValue());
    }

    @Override
    protected RowType getOutputType() {
        return this.leftTypeInfo.toRowType();
    }

    @Tags(value={@Tag(value="leftStateRetentionTime=4000"), @Tag(value="rightStateRetentionTime=1000")})
    @Test
    public void testLeftSemiJoinWithDifferentStateRetentionTime() throws Exception {
        this.testHarness.setStateTtlProcessingTime(1L);
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.setStateTtlProcessingTime(3001L);
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("LineOrd#2", "AIR"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.processElement2(StreamRecordUtils.updateAfterRecord("LineOrd#2", "TRUCK"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement2(StreamRecordUtils.deleteRecord("LineOrd#2", "TRUCK"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement2(StreamRecordUtils.deleteRecord("LineOrd#2", "AIR"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.setStateTtlProcessingTime(4001L);
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("LineOrd#1", "SHIP"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.setStateTtlProcessingTime(5001L);
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#1", "LineOrd#1", "7238 Marsh St., Birmingham, AL 35209"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
    }

    @Test
    public void testLeftSemiJoinWithStateRetentionDisabled() throws Exception {
        this.testHarness.setStateTtlProcessingTime(1L);
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.setStateTtlProcessingTime(3001L);
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("LineOrd#2", "AIR"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.processElement2(StreamRecordUtils.updateAfterRecord("LineOrd#2", "TRUCK"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement2(StreamRecordUtils.deleteRecord("LineOrd#2", "TRUCK"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement2(StreamRecordUtils.deleteRecord("LineOrd#2", "AIR"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.setStateTtlProcessingTime(4001L);
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("LineOrd#1", "SHIP"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.setStateTtlProcessingTime(5001L);
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#1", "LineOrd#1", "7238 Marsh St., Birmingham, AL 35209"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, "Ord#1", "LineOrd#1", "7238 Marsh St., Birmingham, AL 35209"));
    }

    @Tags(value={@Tag(value="leftStateRetentionTime=4000"), @Tag(value="rightStateRetentionTime=1000")})
    @Test
    public void testLeftAntiJoinWithDifferentStateRetentionTime() throws Exception {
        this.testHarness.setStateTtlProcessingTime(1L);
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.setStateTtlProcessingTime(3001L);
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("LineOrd#2", "AIR"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.setStateTtlProcessingTime(4001L);
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("LineOrd#1", "RAIL"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.setStateTtlProcessingTime(5001L);
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#1", "LineOrd#1", "23 W. River Avenue, Port Orange, FL 32127"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, "Ord#1", "LineOrd#1", "23 W. River Avenue, Port Orange, FL 32127"));
    }

    @Test
    public void testLeftAntiJoinWithStateRetentionTimeDisabled() throws Exception {
        this.testHarness.setStateTtlProcessingTime(1L);
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.setStateTtlProcessingTime(3001L);
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("LineOrd#2", "AIR"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.setStateTtlProcessingTime(4001L);
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("LineOrd#1", "RAIL"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.setStateTtlProcessingTime(5001L);
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#1", "LineOrd#1", "23 W. River Avenue, Port Orange, FL 32127"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
    }
}

