/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.window.groupwindow.operator;

import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.NavigableSet;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
import org.apache.flink.table.runtime.generated.NamespaceTableAggsHandleFunction;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.window.MergeCallback;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.Window;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.groupwindow.operator.AggregateWindowOperator;
import org.apache.flink.table.runtime.operators.window.groupwindow.operator.TableAggregateWindowOperator;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.util.Collector;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class WindowOperatorContractTest {
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");

    @Test
    public void testAssignerIsInvokedOncePerElement() throws Exception {
        GroupWindowAssigner<TimeWindow> mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = this.mockTrigger();
        NamespaceAggsHandleFunction mockAggregate = WindowOperatorContractTest.mockAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createWindowOperator((GroupWindowAssigner)mockAssigner, mockTrigger, (NamespaceAggsHandleFunctionBase)mockAggregate, 0L);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((RowData)Matchers.any(), Matchers.anyLong())).thenReturn(Collections.singletonList(new TimeWindow(0L, 0L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("String", 1, 0L));
        ((GroupWindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)1))).assignWindows((RowData)Matchers.eq((Object)StreamRecordUtils.row("String", 1, 0L)), Matchers.eq((long)0L));
        testHarness.processElement(StreamRecordUtils.insertRecord("String", 1, 0L));
        ((GroupWindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)2))).assignWindows((RowData)Matchers.eq((Object)StreamRecordUtils.row("String", 1, 0L)), Matchers.eq((long)0L));
    }

    @Test
    public void testAssignerWithMultipleWindowsForAggregate() throws Exception {
        GroupWindowAssigner<TimeWindow> mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = this.mockTrigger();
        NamespaceAggsHandleFunction mockAggregate = WindowOperatorContractTest.mockAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createWindowOperator((GroupWindowAssigner)mockAssigner, mockTrigger, (NamespaceAggsHandleFunctionBase)mockAggregate, 0L);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((RowData)Matchers.any(), Matchers.anyLong())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        WindowOperatorContractTest.shouldFireOnElement(mockTrigger);
        testHarness.processElement(StreamRecordUtils.insertRecord("String", 1, 0L));
        ((NamespaceAggsHandleFunction)Mockito.verify(mockAggregate, (VerificationMode)Mockito.times((int)2))).getValue((Object)WindowOperatorContractTest.anyTimeWindow());
        ((NamespaceAggsHandleFunction)Mockito.verify(mockAggregate, (VerificationMode)Mockito.times((int)1))).getValue(Matchers.eq((Object)new TimeWindow(0L, 2L)));
        ((NamespaceAggsHandleFunction)Mockito.verify(mockAggregate, (VerificationMode)Mockito.times((int)1))).getValue(Matchers.eq((Object)new TimeWindow(2L, 4L)));
    }

    @Test
    public void testAssignerWithMultipleWindowsForTableAggregate() throws Exception {
        GroupWindowAssigner<TimeWindow> mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = this.mockTrigger();
        NamespaceTableAggsHandleFunction mockAggregate = WindowOperatorContractTest.mockTableAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createWindowOperator((GroupWindowAssigner)mockAssigner, mockTrigger, (NamespaceAggsHandleFunctionBase)mockAggregate, 0L);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((RowData)Matchers.any(), Matchers.anyLong())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        WindowOperatorContractTest.shouldFireOnElement(mockTrigger);
        testHarness.processElement(StreamRecordUtils.insertRecord("String", 1, 0L));
        ((NamespaceTableAggsHandleFunction)Mockito.verify(mockAggregate, (VerificationMode)Mockito.times((int)2))).emitValue((Object)WindowOperatorContractTest.anyTimeWindow(), (RowData)Matchers.any(), (Collector)Matchers.any());
        ((NamespaceTableAggsHandleFunction)Mockito.verify(mockAggregate, (VerificationMode)Mockito.times((int)1))).emitValue(Matchers.eq((Object)new TimeWindow(0L, 2L)), (RowData)Matchers.any(), (Collector)Matchers.any());
        ((NamespaceTableAggsHandleFunction)Mockito.verify(mockAggregate, (VerificationMode)Mockito.times((int)1))).emitValue(Matchers.eq((Object)new TimeWindow(2L, 4L)), (RowData)Matchers.any(), (Collector)Matchers.any());
    }

    @Test
    public void testOnElementCalledPerWindow() throws Exception {
        GroupWindowAssigner<TimeWindow> mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = this.mockTrigger();
        NamespaceAggsHandleFunction mockAggregate = WindowOperatorContractTest.mockAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createWindowOperator((GroupWindowAssigner)mockAssigner, mockTrigger, (NamespaceAggsHandleFunctionBase)mockAggregate, 0L);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((RowData)WindowOperatorContractTest.anyGenericRow(), Matchers.anyLong())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("String", 42, 1L));
        ((Trigger)Mockito.verify(mockTrigger)).onElement(Matchers.eq((Object)StreamRecordUtils.row("String", 42, 1L)), Matchers.eq((long)1L), (Window)Matchers.eq((Object)new TimeWindow(2L, 4L)));
        ((Trigger)Mockito.verify(mockTrigger)).onElement(Matchers.eq((Object)StreamRecordUtils.row("String", 42, 1L)), Matchers.eq((long)1L), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)));
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.times((int)2))).onElement(Matchers.any(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow());
    }

    @Test
    public void testMergeWindowsIsCalled() throws Exception {
        MergingWindowAssigner<TimeWindow> mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        Trigger mockTrigger = this.mockTrigger();
        NamespaceAggsHandleFunction mockAggregate = WindowOperatorContractTest.mockAggsHandleFunction();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createWindowOperator((GroupWindowAssigner)mockAssigner, mockTrigger, (NamespaceAggsHandleFunctionBase)mockAggregate, 0L);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((RowData)WindowOperatorContractTest.anyGenericRow(), Matchers.anyLong())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assertions.assertThat((Collection)testHarness.getOutput()).isEmpty();
        testHarness.processElement(StreamRecordUtils.insertRecord("String", 42, 0L));
        ((MergingWindowAssigner)Mockito.verify(mockAssigner)).mergeWindows((Window)Matchers.eq((Object)new TimeWindow(2L, 4L)), (NavigableSet)Matchers.any(), WindowOperatorContractTest.anyMergeCallback());
        ((MergingWindowAssigner)Mockito.verify(mockAssigner)).mergeWindows((Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), (NavigableSet)Matchers.any(), WindowOperatorContractTest.anyMergeCallback());
        ((MergingWindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)2))).mergeWindows((Window)WindowOperatorContractTest.anyTimeWindow(), (NavigableSet)Matchers.any(), WindowOperatorContractTest.anyMergeCallback());
    }

    private <W extends Window> KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createWindowOperator(GroupWindowAssigner<W> assigner, Trigger<W> trigger, NamespaceAggsHandleFunctionBase<W> aggregationsFunction, long allowedLateness) throws Exception {
        boolean sendRetraction;
        LogicalType[] inputTypes = new LogicalType[]{VarCharType.STRING_TYPE, new IntType()};
        RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, inputTypes);
        InternalTypeInfo keyType = keySelector.getProducedType();
        LogicalType[] accTypes = new LogicalType[]{new BigIntType(), new BigIntType()};
        LogicalType[] windowTypes = new LogicalType[]{new BigIntType(), new BigIntType()};
        LogicalType[] outputTypeWithoutKeys = new LogicalType[]{new BigIntType(), new BigIntType(), new BigIntType(), new BigIntType()};
        boolean bl = sendRetraction = allowedLateness > 0L;
        if (aggregationsFunction instanceof NamespaceAggsHandleFunction) {
            AggregateWindowOperator operator = new AggregateWindowOperator((NamespaceAggsHandleFunction)aggregationsFunction, (RecordEqualiser)Mockito.mock(RecordEqualiser.class), assigner, trigger, assigner.getWindowSerializer(new ExecutionConfig()), inputTypes, outputTypeWithoutKeys, accTypes, windowTypes, 2, sendRetraction, allowedLateness, UTC_ZONE_ID, -1);
            return new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)keySelector, (TypeInformation)keyType);
        }
        TableAggregateWindowOperator operator = new TableAggregateWindowOperator((NamespaceTableAggsHandleFunction)aggregationsFunction, assigner, trigger, assigner.getWindowSerializer(new ExecutionConfig()), inputTypes, outputTypeWithoutKeys, accTypes, windowTypes, 2, sendRetraction, allowedLateness, UTC_ZONE_ID, -1);
        return new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)keySelector, (TypeInformation)keyType);
    }

    private static <W extends Window> NamespaceAggsHandleFunction<W> mockAggsHandleFunction() throws Exception {
        return (NamespaceAggsHandleFunction)Mockito.mock(NamespaceAggsHandleFunction.class);
    }

    private static <W extends Window> NamespaceTableAggsHandleFunction<W> mockTableAggsHandleFunction() throws Exception {
        NamespaceTableAggsHandleFunction tableAggWindowAggregator = (NamespaceTableAggsHandleFunction)Mockito.mock(NamespaceTableAggsHandleFunction.class);
        Mockito.when((Object)tableAggWindowAggregator.getAccumulators()).thenReturn((Object)GenericRowData.of((Object[])new Object[0]));
        return tableAggWindowAggregator;
    }

    private <W extends Window> Trigger<W> mockTrigger() throws Exception {
        Trigger mockTrigger = (Trigger)Mockito.mock(Trigger.class);
        Mockito.when((Object)mockTrigger.onElement(Matchers.any(), Matchers.anyLong(), (Window)Matchers.any())).thenReturn((Object)false);
        Mockito.when((Object)mockTrigger.onEventTime(Matchers.anyLong(), (Window)Matchers.any())).thenReturn((Object)false);
        Mockito.when((Object)mockTrigger.onProcessingTime(Matchers.anyLong(), (Window)Matchers.any())).thenReturn((Object)false);
        return mockTrigger;
    }

    private static TimeWindow anyTimeWindow() {
        return (TimeWindow)Mockito.any();
    }

    private static GenericRowData anyGenericRow() {
        return (GenericRowData)Mockito.any();
    }

    private static GroupWindowAssigner<TimeWindow> mockTimeWindowAssigner() throws Exception {
        GroupWindowAssigner mockAssigner = (GroupWindowAssigner)Mockito.mock(GroupWindowAssigner.class);
        Mockito.when((Object)mockAssigner.getWindowSerializer((ExecutionConfig)Mockito.any())).thenReturn((Object)new TimeWindow.Serializer());
        Mockito.when((Object)mockAssigner.isEventTime()).thenReturn((Object)true);
        return mockAssigner;
    }

    private static MergingWindowAssigner<TimeWindow> mockMergingAssigner() throws Exception {
        MergingWindowAssigner mockAssigner = (MergingWindowAssigner)Mockito.mock(MergingWindowAssigner.class);
        Mockito.when((Object)mockAssigner.getWindowSerializer((ExecutionConfig)Mockito.any())).thenReturn((Object)new TimeWindow.Serializer());
        Mockito.when((Object)mockAssigner.isEventTime()).thenReturn((Object)true);
        return mockAssigner;
    }

    private static MergeCallback<TimeWindow, Collection<TimeWindow>> anyMergeCallback() {
        return (MergeCallback)Mockito.any();
    }

    private static <T> void shouldFireOnElement(Trigger<TimeWindow> mockTrigger) throws Exception {
        Mockito.when((Object)mockTrigger.onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow())).thenReturn((Object)true);
    }
}

