/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.Int2HashJoinOperatorTestBase;
import org.apache.flink.table.runtime.operators.join.SortMergeJoinFunction;
import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator;
import org.apache.flink.table.runtime.operators.join.String2HashJoinOperatorTest;
import org.apache.flink.table.runtime.operators.sort.StringNormalizedKeyComputer;
import org.apache.flink.table.runtime.operators.sort.StringRecordComparator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class String2SortMergeJoinOperatorTest {
    private boolean leftIsSmall;
    InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{VarCharType.STRING_TYPE, VarCharType.STRING_TYPE});
    private InternalTypeInfo<RowData> joinedInfo = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{VarCharType.STRING_TYPE, VarCharType.STRING_TYPE, VarCharType.STRING_TYPE, VarCharType.STRING_TYPE});

    public String2SortMergeJoinOperatorTest(boolean leftIsSmall) {
        this.leftIsSmall = leftIsSmall;
    }

    @Parameterized.Parameters
    public static Collection<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @Test
    public void testInnerJoin() throws Exception {
        StreamOperator joinOperator = String2SortMergeJoinOperatorTest.newOperator(FlinkJoinType.INNER, this.leftIsSmall);
        TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData> testHarness = this.buildSortMergeJoin(joinOperator);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)String2HashJoinOperatorTest.newRow("a", "02")));
        expectedOutput.add(new StreamRecord((Object)String2HashJoinOperatorTest.newRow("b", "14")));
        testHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, String2HashJoinOperatorTest.transformToBinary(testHarness.getOutput()));
    }

    @Test
    public void testLeftOuterJoin() throws Exception {
        StreamOperator joinOperator = String2SortMergeJoinOperatorTest.newOperator(FlinkJoinType.LEFT, this.leftIsSmall);
        TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData> testHarness = this.buildSortMergeJoin(joinOperator);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)String2HashJoinOperatorTest.newRow("a", "02")));
        expectedOutput.add(new StreamRecord((Object)String2HashJoinOperatorTest.newRow("b", "14")));
        expectedOutput.add(new StreamRecord((Object)String2HashJoinOperatorTest.newRow("d", "0null")));
        testHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, String2HashJoinOperatorTest.transformToBinary(testHarness.getOutput()));
    }

    @Test
    public void testRightOuterJoin() throws Exception {
        StreamOperator joinOperator = String2SortMergeJoinOperatorTest.newOperator(FlinkJoinType.RIGHT, this.leftIsSmall);
        TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData> testHarness = this.buildSortMergeJoin(joinOperator);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)String2HashJoinOperatorTest.newRow("a", "02")));
        expectedOutput.add(new StreamRecord((Object)String2HashJoinOperatorTest.newRow("b", "14")));
        expectedOutput.add(new StreamRecord((Object)String2HashJoinOperatorTest.newRow("c", "2null")));
        testHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, String2HashJoinOperatorTest.transformToBinary(testHarness.getOutput()));
    }

    @Test
    public void testFullJoin() throws Exception {
        StreamOperator joinOperator = String2SortMergeJoinOperatorTest.newOperator(FlinkJoinType.FULL, this.leftIsSmall);
        TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData> testHarness = this.buildSortMergeJoin(joinOperator);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)String2HashJoinOperatorTest.newRow("a", "02")));
        expectedOutput.add(new StreamRecord((Object)String2HashJoinOperatorTest.newRow("b", "14")));
        expectedOutput.add(new StreamRecord((Object)String2HashJoinOperatorTest.newRow("c", "2null")));
        expectedOutput.add(new StreamRecord((Object)String2HashJoinOperatorTest.newRow("d", "0null")));
        testHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, String2HashJoinOperatorTest.transformToBinary(testHarness.getOutput()));
    }

    private TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData> buildSortMergeJoin(StreamOperator operator) throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, 2, 2, new int[]{1, 2}, this.typeInfo, this.typeInfo, this.joinedInfo);
        testHarness.memorySize = 0x2400000L;
        testHarness.setupOutputForSingletonOperatorChain();
        testHarness.getStreamConfig().setStreamOperator(operator);
        testHarness.getStreamConfig().setOperatorID(new OperatorID());
        testHarness.getStreamConfig().setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, 0.99);
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement((Object)new StreamRecord((Object)String2HashJoinOperatorTest.newRow("a", "0"), initialTime), 0, 0);
        testHarness.processElement((Object)new StreamRecord((Object)String2HashJoinOperatorTest.newRow("d", "0"), initialTime), 0, 0);
        testHarness.processElement((Object)new StreamRecord((Object)String2HashJoinOperatorTest.newRow("a", "2"), initialTime), 1, 1);
        testHarness.processElement((Object)new StreamRecord((Object)String2HashJoinOperatorTest.newRow("b", "1"), initialTime), 0, 1);
        testHarness.processElement((Object)new StreamRecord((Object)String2HashJoinOperatorTest.newRow("c", "2"), initialTime), 1, 1);
        testHarness.processElement((Object)new StreamRecord((Object)String2HashJoinOperatorTest.newRow("b", "4"), initialTime), 1, 0);
        testHarness.waitForInputProcessing();
        testHarness.endInput();
        return testHarness;
    }

    static StreamOperator newOperator(FlinkJoinType type, boolean leftIsSmaller) {
        int maxNumFileHandles = (Integer)ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue();
        boolean compressionEnable = (Boolean)ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue();
        int compressionBlockSize = (int)((MemorySize)ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue()).getBytes();
        boolean asyncMergeEnable = (Boolean)ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue();
        SortMergeJoinFunction sortMergeJoinFunction = new SortMergeJoinFunction(0.0, type, leftIsSmaller, maxNumFileHandles, compressionEnable, compressionBlockSize, asyncMergeEnable, new GeneratedJoinCondition("", "", new Object[0]){

            public JoinCondition newInstance(ClassLoader classLoader) {
                return new Int2HashJoinOperatorTestBase.TrueCondition();
            }
        }, new GeneratedProjection("", "", new Object[0]){

            public Projection newInstance(ClassLoader classLoader) {
                return new String2HashJoinOperatorTest.MyProjection();
            }
        }, new GeneratedProjection("", "", new Object[0]){

            public Projection newInstance(ClassLoader classLoader) {
                return new String2HashJoinOperatorTest.MyProjection();
            }
        }, new GeneratedNormalizedKeyComputer("", ""){

            public NormalizedKeyComputer newInstance(ClassLoader classLoader) {
                return new StringNormalizedKeyComputer();
            }
        }, new GeneratedRecordComparator("", "", new Object[0]){

            public RecordComparator newInstance(ClassLoader classLoader) {
                return new StringRecordComparator();
            }
        }, new GeneratedNormalizedKeyComputer("", ""){

            public NormalizedKeyComputer newInstance(ClassLoader classLoader) {
                return new StringNormalizedKeyComputer();
            }
        }, new GeneratedRecordComparator("", "", new Object[0]){

            public RecordComparator newInstance(ClassLoader classLoader) {
                return new StringRecordComparator();
            }
        }, new GeneratedRecordComparator("", "", new Object[0]){

            public RecordComparator newInstance(ClassLoader classLoader) {
                return new StringRecordComparator();
            }
        }, new boolean[]{true});
        return new SortMergeJoinOperator(sortMergeJoinFunction);
    }
}

