/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.util.List;
import java.util.Set;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.core.fs.AutoCloseableRegistry;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorTestBase;
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class SourceCoordinatorAlignmentTest
extends SourceCoordinatorTestBase {
    SourceCoordinatorAlignmentTest() {
    }

    @Test
    void testWatermarkAlignment() throws Exception {
        try (AutoCloseableRegistry closeableRegistry = new AutoCloseableRegistry();){
            SourceCoordinator<?, ?> sourceCoordinator1 = this.getAndStartNewSourceCoordinator(new WatermarkAlignmentParams(1000L, "group1", Long.MAX_VALUE), closeableRegistry);
            int subtask0 = 0;
            int subtask1 = 1;
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, 42L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask1, 44L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1042L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, 5000L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1044L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1044L);
        }
    }

    @Test
    void testWatermarkAlignmentWithIdleness() throws Exception {
        try (AutoCloseableRegistry closeableRegistry = new AutoCloseableRegistry();){
            SourceCoordinator<?, ?> sourceCoordinator1 = this.getAndStartNewSourceCoordinator(new WatermarkAlignmentParams(1000L, "group1", Long.MAX_VALUE), closeableRegistry);
            int subtask0 = 0;
            int subtask1 = 1;
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, 42L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask1, 44L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1042L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, Long.MAX_VALUE);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1044L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1044L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, 42L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1042L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, Long.MAX_VALUE);
            this.reportWatermarkEvent(sourceCoordinator1, subtask1, Long.MAX_VALUE);
            this.assertLatestWatermarkAlignmentEvent(subtask0, Long.MAX_VALUE);
            this.assertLatestWatermarkAlignmentEvent(subtask1, Long.MAX_VALUE);
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, 42L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1042L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask1, 46L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1042L);
        }
    }

    @Test
    void testWatermarkAlignmentWithTwoGroups() throws Exception {
        try (AutoCloseableRegistry closeableRegistry = new AutoCloseableRegistry();){
            long maxDrift = 1000L;
            SourceCoordinator<?, ?> sourceCoordinator1 = this.getAndStartNewSourceCoordinator(new WatermarkAlignmentParams(maxDrift, "group1", Long.MAX_VALUE), closeableRegistry);
            SourceCoordinator<?, ?> sourceCoordinator2 = this.getAndStartNewSourceCoordinator(new WatermarkAlignmentParams(maxDrift, "group2", Long.MAX_VALUE), closeableRegistry);
            int subtask0 = 0;
            int subtask1 = 1;
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, 42L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.reportWatermarkEvent(sourceCoordinator2, subtask1, 44L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1044L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, 5000L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 6000L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1044L);
        }
    }

    private SourceCoordinator<?, ?> getAndStartNewSourceCoordinator(WatermarkAlignmentParams watermarkAlignmentParams, AutoCloseableRegistry closeableRegistry) throws Exception {
        SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> sourceCoordinator = this.getNewSourceCoordinator(watermarkAlignmentParams);
        closeableRegistry.registerCloseable(sourceCoordinator);
        sourceCoordinator.start();
        this.setAllReaderTasksReady(sourceCoordinator);
        return sourceCoordinator;
    }

    private void reportWatermarkEvent(SourceCoordinator<?, ?> sourceCoordinator1, int subtask, long watermark) {
        sourceCoordinator1.handleEventFromOperator(subtask, 0, (OperatorEvent)new ReportedWatermarkEvent(watermark));
        this.waitForCoordinatorToProcessActions();
        sourceCoordinator1.announceCombinedWatermark();
    }

    private void assertLatestWatermarkAlignmentEvent(int subtask, long expectedWatermark) {
        List<OperatorEvent> events = this.receivingTasks.getSentEventsForSubtask(subtask);
        Assertions.assertThat(events).isNotEmpty();
        Assertions.assertThat((Object)events.get(events.size() - 1)).isEqualTo((Object)new WatermarkAlignmentEvent(expectedWatermark));
    }
}

