/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.over;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.over.NonBufferOverWindowOperator;
import org.apache.flink.table.runtime.operators.over.SumAggsHandleFunction;
import org.apache.flink.table.runtime.operators.sort.IntRecordComparator;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.OutputTag;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

class NonBufferOverWindowOperatorTest {
    static GeneratedAggsHandleFunction function = new GeneratedAggsHandleFunction("Function1", "", new Object[0]){

        public AggsHandleFunction newInstance(ClassLoader classLoader) {
            return new SumAggsHandleFunction(1);
        }
    };
    static GeneratedRecordComparator comparator = new GeneratedRecordComparator("Comparator", "", new Object[0]){

        public RecordComparator newInstance(ClassLoader classLoader) {
            return new IntRecordComparator();
        }
    };
    static RowType inputType = RowType.of((LogicalType[])new LogicalType[]{new IntType(), new BigIntType(), new BigIntType()});
    static RowDataSerializer inputSer = new RowDataSerializer(inputType);
    private static GeneratedAggsHandleFunction[] functions;
    private NonBufferOverWindowOperator operator;
    private List<GenericRowData> collect;

    NonBufferOverWindowOperatorTest() {
    }

    @BeforeEach
    void before() throws Exception {
        this.collect = new ArrayList<GenericRowData>();
    }

    @Test
    void testNormal() throws Exception {
        this.test(new boolean[]{false, false}, new GenericRowData[]{GenericRowData.of((Object[])new Object[]{0, 1L, 4L, 1L, 4L}), GenericRowData.of((Object[])new Object[]{0, 1L, 1L, 2L, 5L}), GenericRowData.of((Object[])new Object[]{1, 5L, 2L, 5L, 2L}), GenericRowData.of((Object[])new Object[]{2, 5L, 4L, 5L, 4L}), GenericRowData.of((Object[])new Object[]{2, 6L, 2L, 11L, 6L})});
    }

    @Test
    void testResetAccumulators() throws Exception {
        this.test(new boolean[]{true, false}, new GenericRowData[]{GenericRowData.of((Object[])new Object[]{0, 1L, 4L, 1L, 4L}), GenericRowData.of((Object[])new Object[]{0, 1L, 1L, 1L, 5L}), GenericRowData.of((Object[])new Object[]{1, 5L, 2L, 5L, 2L}), GenericRowData.of((Object[])new Object[]{2, 5L, 4L, 5L, 4L}), GenericRowData.of((Object[])new Object[]{2, 6L, 2L, 6L, 6L})});
    }

    private void test(boolean[] resetAccumulators, GenericRowData[] expect) throws Exception {
        MockEnvironment env = new MockEnvironmentBuilder().build();
        StreamTask<Object, StreamOperator<Object>> task = new StreamTask<Object, StreamOperator<Object>>((Environment)env){

            protected void init() {
            }
        };
        StreamConfig streamConfig = (StreamConfig)Mockito.mock(StreamConfig.class);
        Mockito.when((Object)streamConfig.getTypeSerializerIn1(Thread.currentThread().getContextClassLoader())).thenReturn((Object)inputSer);
        Mockito.when((Object)streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot((ManagedMemoryUseCase)ArgumentMatchers.eq((Object)ManagedMemoryUseCase.OPERATOR), (Configuration)ArgumentMatchers.any(Configuration.class), (Configuration)ArgumentMatchers.any(Configuration.class), (ClassLoader)ArgumentMatchers.any(ClassLoader.class))).thenReturn((Object)0.99);
        Mockito.when((Object)streamConfig.getOperatorID()).thenReturn((Object)new OperatorID());
        this.operator = new NonBufferOverWindowOperator(new StreamOperatorParameters((StreamTask)task, streamConfig, (Output)new ConsumerOutput(r -> this.collect.add(GenericRowData.of((Object[])new Object[]{r.getInt(0), r.getLong(1), r.getLong(2), r.getLong(3), r.getLong(4)}))), TestProcessingTimeService::new, null, null), functions, comparator, resetAccumulators){

            public StreamingRuntimeContext getRuntimeContext() {
                return (StreamingRuntimeContext)Mockito.mock(StreamingRuntimeContext.class);
            }
        };
        this.operator.open();
        this.addRow(0, 1L, 4L);
        this.addRow(0, 1L, 1L);
        this.addRow(1, 5L, 2L);
        this.addRow(2, 5L, 4L);
        this.addRow(2, 6L, 2L);
        Object[] outputs = this.collect.toArray(new GenericRowData[0]);
        Assertions.assertThat((Object[])outputs).isEqualTo((Object)expect);
    }

    private void addRow(Object ... fields) throws Exception {
        this.operator.processElement(new StreamRecord((Object)GenericRowData.of((Object[])fields)));
    }

    static {
        GeneratedAggsHandleFunction function2 = new GeneratedAggsHandleFunction("Function2", "", new Object[0]){

            public AggsHandleFunction newInstance(ClassLoader classLoader) {
                return new SumAggsHandleFunction(2);
            }
        };
        functions = new GeneratedAggsHandleFunction[]{function, function2};
    }

    static class ConsumerOutput
    implements Output<StreamRecord<RowData>> {
        private final Consumer<RowData> consumer;

        public ConsumerOutput(Consumer<RowData> consumer) {
            this.consumer = consumer;
        }

        public void emitWatermark(Watermark mark) {
            throw new RuntimeException();
        }

        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
            throw new RuntimeException();
        }

        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            throw new RuntimeException();
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            throw new RuntimeException();
        }

        public void emitRecordAttributes(RecordAttributes recordAttributes) {
            throw new RuntimeException();
        }

        public void collect(StreamRecord<RowData> record) {
            this.consumer.accept((RowData)record.getValue());
        }

        public void close() {
        }
    }
}

