/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.sink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class SinkUpsertMaterializerTest {
    private final StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig((long)1000L);
    private final LogicalType[] types = new LogicalType[]{new BigIntType(), new IntType(), new VarCharType()};
    private final RowDataSerializer serializer = new RowDataSerializer(this.types);
    private final RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{1}, this.types);
    private final GeneratedRecordEqualiser equaliser = new GeneratedRecordEqualiser("", "", new Object[0]){

        public RecordEqualiser newInstance(ClassLoader classLoader) {
            return new TestRecordEqualiser();
        }
    };
    private final GeneratedRecordEqualiser upsertKeyEqualiser = new GeneratedRecordEqualiser("", "", new Object[0]){

        public RecordEqualiser newInstance(ClassLoader classLoader) {
            return new TestUpsertKeyEqualiser();
        }
    };

    @Test
    public void test() throws Exception {
        SinkUpsertMaterializer materializer = new SinkUpsertMaterializer(this.ttlConfig, (TypeSerializer)this.serializer, this.equaliser, this.upsertKeyEqualiser, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)materializer, (KeySelector)this.keySelector, (TypeInformation)this.keySelector.getProducedType());
        testHarness.open();
        testHarness.setStateTtlProcessingTime(1L);
        testHarness.processElement(StreamRecordUtils.insertRecord(1L, 1, "a1"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2L, 1, "a2"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3L, 1, "a3"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(2L, 1, "a2"));
        this.shouldEmitNothing((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness);
        testHarness.processElement(StreamRecordUtils.deleteRecord(3L, 1, "a3"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(1L, 1, "a1"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, 1L, 1, "a1"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4L, 1, "a4"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, 4L, 1, "a4"));
        testHarness.setStateTtlProcessingTime(1002L);
        testHarness.processElement(StreamRecordUtils.deleteRecord(4L, 1, "a4"));
        this.shouldEmitNothing((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness);
        testHarness.close();
    }

    @Test
    public void testInputHasUpsertKeyWithNonDeterministicColumn() throws Exception {
        SinkUpsertMaterializer materializer = new SinkUpsertMaterializer(this.ttlConfig, (TypeSerializer)this.serializer, this.equaliser, this.upsertKeyEqualiser, new int[]{0});
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)materializer, (KeySelector)this.keySelector, (TypeInformation)this.keySelector.getProducedType());
        testHarness.open();
        testHarness.setStateTtlProcessingTime(1L);
        testHarness.processElement(StreamRecordUtils.insertRecord(1L, 1, "a1"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord(1L, 1, "a11"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3L, 1, "a3"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(1L, 1, "a111"));
        this.shouldEmitNothing((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness);
        testHarness.processElement(StreamRecordUtils.deleteRecord(3L, 1, "a33"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, 3L, 1, "a33"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4L, 1, "a4"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, 4L, 1, "a4"));
        testHarness.setStateTtlProcessingTime(1002L);
        testHarness.processElement(StreamRecordUtils.deleteRecord(4L, 1, "a4"));
        this.shouldEmitNothing((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness);
        testHarness.close();
    }

    private void shouldEmitNothing(OneInputStreamOperatorTestHarness<RowData, RowData> harness) {
        Assertions.assertThat(SinkUpsertMaterializerTest.getEmittedRows(harness)).isEmpty();
    }

    private void shouldEmit(OneInputStreamOperatorTestHarness<RowData, RowData> harness, RowData expected) {
        Assertions.assertThat(SinkUpsertMaterializerTest.getEmittedRows(harness)).containsExactly((Object[])new RowData[]{expected});
    }

    private static List<RowData> getEmittedRows(OneInputStreamOperatorTestHarness<RowData, RowData> harness) {
        Object o;
        ArrayList<RowData> rows = new ArrayList<RowData>();
        while ((o = harness.getOutput().poll()) != null) {
            RowData value = (RowData)((StreamRecord)o).getValue();
            GenericRowData newRow = GenericRowData.of((Object[])new Object[]{value.getLong(0), value.getInt(1), value.getString(2)});
            newRow.setRowKind(value.getRowKind());
            rows.add((RowData)newRow);
        }
        return rows;
    }

    private static class TestUpsertKeyEqualiser
    implements RecordEqualiser {
        private TestUpsertKeyEqualiser() {
        }

        public boolean equals(RowData row1, RowData row2) {
            return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) == row2.getLong(0);
        }
    }

    private static class TestRecordEqualiser
    implements RecordEqualiser {
        private TestRecordEqualiser() {
        }

        public boolean equals(RowData row1, RowData row2) {
            return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) == row2.getLong(0) && row1.getInt(1) == row2.getInt(1) && row1.getString(2).equals(row2.getString(2));
        }
    }
}

