/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.state.StateFutureImpl;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.MockStateExecutor;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.InternalTimerServiceImplTest;
import org.apache.flink.streaming.api.operators.InternalTimersSnapshot;
import org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.TimerSerializer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class InternalTimerServiceAsyncImplTest {
    private AsyncExecutionController<String> asyncExecutionController;
    private TestKeyContext keyContext;
    private TestProcessingTimeService processingTimeService;
    private InternalTimerServiceAsyncImpl<Integer, String> service;
    private KeyGroupRange testKeyGroupList;
    private StateFutureImpl.AsyncFrameworkExceptionHandler exceptionHandler = new StateFutureImpl.AsyncFrameworkExceptionHandler(){

        public void handleException(String message, Throwable exception) {
            throw new RuntimeException(message, exception);
        }
    };

    InternalTimerServiceAsyncImplTest() {
    }

    @BeforeEach
    void setup() throws Exception {
        this.asyncExecutionController = new AsyncExecutionController((MailboxExecutor)new SyncMailboxExecutor(), this.exceptionHandler, (StateExecutor)new MockStateExecutor(), new DeclarationManager(), 128, 2, 1000L, 10, null, null);
        int totalKeyGroups = 128;
        this.testKeyGroupList = new KeyGroupRange(0, totalKeyGroups - 1);
        this.keyContext = new TestKeyContext();
        this.processingTimeService = new TestProcessingTimeService();
        this.processingTimeService.setCurrentTime(0L);
        HeapPriorityQueueSetFactory factory = new HeapPriorityQueueSetFactory(this.testKeyGroupList, totalKeyGroups, 128);
        this.service = InternalTimerServiceAsyncImplTest.createInternalTimerService(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), this.testKeyGroupList, this.keyContext, (ProcessingTimeService)this.processingTimeService, IntSerializer.INSTANCE, StringSerializer.INSTANCE, (PriorityQueueSetFactory)factory, this.asyncExecutionController);
        TestTriggerable.processingTriggerCount = 0;
        TestTriggerable.eventTriggerCount = 0;
    }

    @Test
    void testTimerWithSameKey() throws Exception {
        this.keyContext.setCurrentKey("key-1");
        this.service.registerProcessingTimeTimer((Object)"processing-timer-1", 1L);
        this.service.registerProcessingTimeTimer((Object)"processing-timer-2", 1L);
        TestTriggerable testTriggerable = new TestTriggerable();
        this.service.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, (Triggerable)testTriggerable);
        Assertions.assertThat((int)TestTriggerable.processingTriggerCount).isEqualTo(0);
        this.processingTimeService.advance(1L);
        Assertions.assertThat((int)TestTriggerable.processingTriggerCount).isEqualTo(2);
    }

    @Test
    void testProcessingTimerFireOrder() throws Exception {
        this.keyContext.setCurrentKey("key-1");
        this.service.registerProcessingTimeTimer((Object)"processing-timer-1", 1L);
        TestTriggerable testTriggerable = new TestTriggerable();
        this.service.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, (Triggerable)testTriggerable);
        Assertions.assertThat((int)TestTriggerable.processingTriggerCount).isEqualTo(0);
        this.processingTimeService.advance(1L);
        Assertions.assertThat((int)TestTriggerable.processingTriggerCount).isEqualTo(1);
        this.keyContext.setCurrentKey("key-2");
        this.service.registerProcessingTimeTimer((Object)"processing-timer-2", 2L);
        Assertions.assertThat((int)TestTriggerable.processingTriggerCount).isEqualTo(1);
        RecordContext recordContext = this.asyncExecutionController.buildContext((Object)"record2", (Object)"key-2");
        this.asyncExecutionController.setCurrentContext(recordContext);
        this.asyncExecutionController.handleRequest(null, StateRequestType.VALUE_GET, null);
        this.processingTimeService.advance(1L);
        Assertions.assertThat((int)TestTriggerable.processingTriggerCount).isEqualTo(1);
        recordContext.release();
        this.processingTimeService.advance(1L);
        Assertions.assertThat((int)TestTriggerable.processingTriggerCount).isEqualTo(2);
    }

    @Test
    void testEventTimerFireOrder() throws Exception {
        this.keyContext.setCurrentKey("key-1");
        this.service.registerEventTimeTimer((Object)"event-timer-1", 1L);
        TestTriggerable testTriggerable = new TestTriggerable();
        this.service.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, (Triggerable)testTriggerable);
        Assertions.assertThat((int)TestTriggerable.eventTriggerCount).isEqualTo(0);
        this.service.advanceWatermark(1L);
        Assertions.assertThat((int)TestTriggerable.eventTriggerCount).isEqualTo(1);
        this.keyContext.setCurrentKey("key-2");
        this.service.registerEventTimeTimer((Object)"event-timer-2", 2L);
        Assertions.assertThat((int)TestTriggerable.eventTriggerCount).isEqualTo(1);
        RecordContext recordContext = this.asyncExecutionController.buildContext((Object)"record2", (Object)"key-2");
        this.asyncExecutionController.setCurrentContext(recordContext);
        this.asyncExecutionController.handleRequest(null, StateRequestType.VALUE_GET, null);
        this.service.advanceWatermark(2L);
        Assertions.assertThat((int)TestTriggerable.eventTriggerCount).isEqualTo(1);
        recordContext.release();
        this.service.advanceWatermark(3L);
        Assertions.assertThat((int)TestTriggerable.eventTriggerCount).isEqualTo(2);
    }

    @Test
    void testSameKeyEventTimerFireOrder() throws Exception {
        this.keyContext.setCurrentKey("key-1");
        this.service.registerEventTimeTimer((Object)"event-timer-1", 1L);
        SameTimerTriggerable testTriggerable = new SameTimerTriggerable(this.asyncExecutionController);
        this.service.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, (Triggerable)testTriggerable);
        Assertions.assertThat((int)SameTimerTriggerable.eventTriggerCount).isEqualTo(0);
        this.service.advanceWatermark(1L);
        Assertions.assertThat((int)SameTimerTriggerable.eventTriggerCount).isEqualTo(1);
        Assertions.assertThat((int)this.asyncExecutionController.getInFlightRecordNum()).isEqualTo(0);
        this.keyContext.setCurrentKey("key-1");
        this.service.registerEventTimeTimer((Object)"event-timer-2", 2L);
        this.service.registerEventTimeTimer((Object)"event-timer-3", 3L);
        Assertions.assertThat((int)SameTimerTriggerable.eventTriggerCount).isEqualTo(1);
        this.service.advanceWatermark(3L);
        Assertions.assertThat((int)this.asyncExecutionController.getInFlightRecordNum()).isEqualTo(0);
        Assertions.assertThat((int)SameTimerTriggerable.eventTriggerCount).isEqualTo(3);
    }

    @Test
    void testSnapshotAndRestore() throws Exception {
        this.service.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, (Triggerable)new TestTriggerable());
        this.keyContext.setCurrentKey("key-1");
        int key1 = InternalTimerServiceAsyncImplTest.getKeyInKeyGroupRange(this.testKeyGroupList, this.testKeyGroupList.getNumberOfKeyGroups());
        int key2 = InternalTimerServiceAsyncImplTest.getKeyInKeyGroupRange(this.testKeyGroupList, this.testKeyGroupList.getNumberOfKeyGroups());
        while (key2 == key1) {
            key2 = InternalTimerServiceAsyncImplTest.getKeyInKeyGroupRange(this.testKeyGroupList, this.testKeyGroupList.getNumberOfKeyGroups());
        }
        this.keyContext.setCurrentKey(key1);
        this.service.registerProcessingTimeTimer((Object)"ciao", 10L);
        this.service.registerEventTimeTimer((Object)"hello", 10L);
        this.keyContext.setCurrentKey(key2);
        this.service.registerEventTimeTimer((Object)"ciao", 10L);
        this.service.registerProcessingTimeTimer((Object)"hello", 10L);
        Assertions.assertThat((int)this.service.numProcessingTimeTimers()).isEqualTo(2);
        Assertions.assertThat((int)this.service.numProcessingTimeTimers((Object)"hello")).isOne();
        Assertions.assertThat((int)this.service.numProcessingTimeTimers((Object)"ciao")).isOne();
        Assertions.assertThat((int)this.service.numEventTimeTimers()).isEqualTo(2);
        Assertions.assertThat((int)this.service.numEventTimeTimers((Object)"hello")).isOne();
        Assertions.assertThat((int)this.service.numEventTimeTimers((Object)"ciao")).isOne();
        HashMap<Integer, byte[]> snapshot = new HashMap<Integer, byte[]>();
        for (Integer keyGroupIndex : this.testKeyGroupList) {
            try (ByteArrayOutputStream outStream = new ByteArrayOutputStream();){
                InternalTimersSnapshot timersSnapshot = this.service.snapshotTimersForKeyGroup(keyGroupIndex.intValue());
                InternalTimersSnapshotReaderWriters.getWriterForVersion((int)2, (InternalTimersSnapshot)timersSnapshot, (TypeSerializer)this.service.getKeySerializer(), (TypeSerializer)this.service.getNamespaceSerializer()).writeTimersSnapshot((DataOutputView)new DataOutputViewStreamWrapper((OutputStream)outStream));
                snapshot.put(keyGroupIndex, outStream.toByteArray());
            }
        }
        TestTriggerable testTriggerable = new TestTriggerable();
        TestTriggerable.eventTriggerCount = 0;
        TestTriggerable.processingTriggerCount = 0;
        this.processingTimeService = new TestProcessingTimeService();
        this.service = this.restoreTimerService(snapshot, 2, testTriggerable, this.keyContext, (ProcessingTimeService)this.processingTimeService);
        this.processingTimeService.setCurrentTime(10L);
        this.service.advanceWatermark(10L);
        Assertions.assertThat((int)TestTriggerable.eventTriggerCount).isEqualTo(2);
        Assertions.assertThat((int)TestTriggerable.processingTriggerCount).isEqualTo(2);
        Assertions.assertThat((int)this.service.numEventTimeTimers()).isZero();
    }

    private InternalTimerServiceAsyncImpl<Integer, String> restoreTimerService(Map<Integer, byte[]> state, int snapshotVersion, Triggerable<Integer, String> triggerable, KeyContext keyContext, ProcessingTimeService processingTimeService) throws Exception {
        InternalTimerServiceAsyncImpl service = InternalTimerServiceAsyncImplTest.createInternalTimerService(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), this.testKeyGroupList, keyContext, processingTimeService, IntSerializer.INSTANCE, StringSerializer.INSTANCE, (PriorityQueueSetFactory)new HeapPriorityQueueSetFactory(this.testKeyGroupList, this.testKeyGroupList.getNumberOfKeyGroups(), 128), this.asyncExecutionController);
        for (Integer keyGroupIndex : this.testKeyGroupList) {
            if (!state.containsKey(keyGroupIndex)) continue;
            try (ByteArrayInputStream inputStream = new ByteArrayInputStream(state.get(keyGroupIndex));){
                InternalTimersSnapshot restoredTimersSnapshot = InternalTimersSnapshotReaderWriters.getReaderForVersion((int)snapshotVersion, (ClassLoader)InternalTimerServiceImplTest.class.getClassLoader()).readTimersSnapshot((DataInputView)new DataInputViewStreamWrapper((InputStream)inputStream));
                service.restoreTimersForKeyGroup(restoredTimersSnapshot, keyGroupIndex.intValue());
            }
        }
        service.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, triggerable);
        return service;
    }

    private static <K, N> InternalTimerServiceAsyncImpl<K, N> createInternalTimerService(TaskIOMetricGroup taskIOMetricGroup, KeyGroupRange keyGroupsList, KeyContext keyContext, ProcessingTimeService processingTimeService, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, PriorityQueueSetFactory priorityQueueSetFactory, AsyncExecutionController asyncExecutionController) {
        TimerSerializer timerSerializer = new TimerSerializer(keySerializer, namespaceSerializer);
        InternalTimerServiceAsyncImpl serviceAsync = new InternalTimerServiceAsyncImpl(taskIOMetricGroup, keyGroupsList, keyContext, processingTimeService, priorityQueueSetFactory.create("__async_processing_timers", (TypeSerializer)timerSerializer), priorityQueueSetFactory.create("__async_event_timers", (TypeSerializer)timerSerializer), StreamTaskCancellationContext.alwaysRunning());
        serviceAsync.setup(asyncExecutionController);
        return serviceAsync;
    }

    private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism) {
        Random rand = new Random(System.currentTimeMillis());
        int result = rand.nextInt();
        while (!range.contains(KeyGroupRangeAssignment.assignToKeyGroup((Object)result, (int)maxParallelism))) {
            result = rand.nextInt();
        }
        return result;
    }

    private static class TestKeyContext
    implements KeyContext {
        private Object key;

        private TestKeyContext() {
        }

        public void setCurrentKey(Object key) {
            this.key = key;
        }

        public Object getCurrentKey() {
            return this.key;
        }
    }

    private static class TestTriggerable
    implements Triggerable<Integer, String> {
        private static int eventTriggerCount = 0;
        private static int processingTriggerCount = 0;

        private TestTriggerable() {
        }

        public void onEventTime(InternalTimer<Integer, String> timer) throws Exception {
            ++eventTriggerCount;
        }

        public void onProcessingTime(InternalTimer<Integer, String> timer) throws Exception {
            ++processingTriggerCount;
        }
    }

    private static class SameTimerTriggerable
    implements Triggerable<Integer, String> {
        private AsyncExecutionController aec;
        private static int eventTriggerCount = 0;

        public SameTimerTriggerable(AsyncExecutionController aec) {
            this.aec = aec;
        }

        public void onEventTime(InternalTimer<Integer, String> timer) throws Exception {
            RecordContext recordContext = this.aec.buildContext((Object)"record", (Object)"key");
            this.aec.setCurrentContext(recordContext);
            this.aec.handleRequestSync(null, StateRequestType.SYNC_POINT, null);
            ++eventTriggerCount;
        }

        public void onProcessingTime(InternalTimer<Integer, String> timer) throws Exception {
        }
    }
}

