/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class SourceOperatorSplitWatermarkAlignmentTest {
    SourceOperatorSplitWatermarkAlignmentTest() {
    }

    @Test
    void testSplitWatermarkAlignment() throws Exception {
        MockSourceReader sourceReader = new MockSourceReader(MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
        StreamMockEnvironment env = this.getTestingEnvironment();
        TestingSourceOperator operator = new TestingSourceOperator(new StreamOperatorParameters((StreamTask)new SourceOperatorStreamTask((Environment)env), (StreamConfig)new MockStreamConfig(new Configuration(), 1), new MockOutput(new ArrayList()), TestProcessingTimeService::new, null, null), sourceReader, WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new TestWatermarkGenerator()).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(r, l) -> r.intValue()).withWatermarkAlignment("group-1", Duration.ofMillis(1L)), (ProcessingTimeService)new TestProcessingTimeService(), new MockOperatorEventGateway(), 1, 5, true);
        operator.initializeState((StreamTaskStateInitializer)new StreamTaskStateInitializerImpl((Environment)env, (StateBackend)new HashMapStateBackend()));
        operator.open();
        MockSourceSplit split1 = new MockSourceSplit(0, 0, 10);
        MockSourceSplit split2 = new MockSourceSplit(1, 10, 20);
        split1.addRecord(5);
        split1.addRecord(11);
        split2.addRecord(3);
        split2.addRecord(12);
        operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Arrays.asList(split1, split2), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        operator.emitNext(dataOutput);
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent(4L));
        Assertions.assertThat((Collection)sourceReader.getPausedSplits()).containsExactly((Object[])new String[]{"0"});
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent(5L));
        Assertions.assertThat((Collection)sourceReader.getPausedSplits()).isEmpty();
        operator.emitNext(dataOutput);
        operator.emitNext(dataOutput);
        Assertions.assertThat((Collection)sourceReader.getPausedSplits()).containsExactly((Object[])new String[]{"0"});
        operator.emitNext(dataOutput);
        Assertions.assertThat((Collection)sourceReader.getPausedSplits()).containsExactly((Object[])new String[]{"0", "1"});
    }

    @Test
    void testBackpressureAndIdleness() throws Exception {
        int i;
        long idleTimeout = 100L;
        MockSourceReader sourceReader = new MockSourceReader(MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        SourceOperator<Integer, MockSourceSplit> operator = this.createAndOpenSourceOperatorWithIdleness(sourceReader, processingTimeService, idleTimeout);
        MockSourceSplit split0 = new MockSourceSplit(0, 0, 10).addRecord(42).addRecord(44);
        MockSourceSplit split1 = new MockSourceSplit(1, 10, 20);
        operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Arrays.asList(split0, split1), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        operator.emitNext(dataOutput);
        TaskIOMetricGroup taskIOMetricGroup = operator.getContainingTask().getEnvironment().getMetricGroup().getIOMetricGroup();
        taskIOMetricGroup.getHardBackPressuredTimePerSecond().markStart();
        for (i = 0; i < 10; ++i) {
            processingTimeService.advance(idleTimeout);
            operator.emitNext(dataOutput);
        }
        Assertions.assertThat(dataOutput.getEvents()).doesNotContain(new Object[]{WatermarkStatus.IDLE});
        Assertions.assertThat(dataOutput.getEvents()).doNotHave((Condition)new AnyWatermark());
        taskIOMetricGroup.getHardBackPressuredTimePerSecond().markEnd();
        taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markStart();
        for (i = 0; i < 10; ++i) {
            processingTimeService.advance(idleTimeout);
        }
        Assertions.assertThat(dataOutput.getEvents()).doesNotContain(new Object[]{WatermarkStatus.IDLE});
        Assertions.assertThat(dataOutput.getEvents()).doNotHave((Condition)new AnyWatermark());
        taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markEnd();
        for (i = 0; i < 10; ++i) {
            processingTimeService.advance(idleTimeout);
        }
        Assertions.assertThat(dataOutput.getEvents()).contains(new Object[]{WatermarkStatus.IDLE});
        Assertions.assertThat(dataOutput.getEvents()).doNotHave((Condition)new AnyWatermark());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testSingleSplitWatermarkAlignmentAndIdleness(boolean usePerSplitOutputs) throws Exception {
        long idleTimeout = 100L;
        MockSourceReader sourceReader = new MockSourceReader(MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, usePerSplitOutputs);
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        processingTimeService.setCurrentTime(1L);
        SourceOperator<Integer, MockSourceSplit> operator = this.createAndOpenSourceOperatorWithIdleness(sourceReader, processingTimeService, idleTimeout);
        MockSourceSplit split0 = new MockSourceSplit(0, 0, 10);
        int maxAllowedWatermark = 4;
        int maxEmittedWatermark = maxAllowedWatermark + 1;
        split0.addRecord(1).addRecord(1).addRecord(1).addRecord(1).addRecord(maxEmittedWatermark).addRecord(maxEmittedWatermark + 100);
        operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(split0), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent((long)maxAllowedWatermark));
        for (int i = 0; i < 10; ++i) {
            operator.emitNext(dataOutput);
            processingTimeService.advance(idleTimeout);
        }
        Assertions.assertThat(dataOutput.getEvents()).doesNotContain(new Object[]{WatermarkStatus.IDLE});
    }

    @Test
    void testMultiSplitWatermarkAlignmentAndIdleness() throws Exception {
        long idleTimeout = 100L;
        MockSourceReader sourceReader = new MockSourceReader(MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        SourceOperator<Integer, MockSourceSplit> operator = this.createAndOpenSourceOperatorWithIdleness(sourceReader, processingTimeService, idleTimeout);
        MockSourceSplit split0 = new MockSourceSplit(0, 0, 10);
        MockSourceSplit split1 = new MockSourceSplit(1, 10, 20);
        int maxAllowedWatermark = 4;
        int maxEmittedWatermark = maxAllowedWatermark + 1;
        split0.addRecord(maxEmittedWatermark).addRecord(maxEmittedWatermark + 100);
        split1.addRecord(3).addRecord(3).addRecord(3).addRecord(3).addRecord(3).addRecord(3).addRecord(3);
        split1.addRecord(maxEmittedWatermark + 100);
        operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Arrays.asList(split0, split1), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        operator.emitNext(dataOutput);
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent((long)maxAllowedWatermark));
        Assertions.assertThat((Collection)sourceReader.getPausedSplits()).containsExactly((Object[])new String[]{"0"});
        while (operator.isAvailable()) {
            processingTimeService.advance(idleTimeout - 1L);
            operator.emitNext(dataOutput);
        }
        Assertions.assertThat((Collection)sourceReader.getPausedSplits()).containsExactly((Object[])new String[]{"0", "1"});
        Assertions.assertThat(dataOutput.getEvents()).doNotHave((Condition)new WatermarkAbove(maxEmittedWatermark));
    }

    @Test
    void testSplitWatermarkAlignmentWithFinishedSplit() throws Exception {
        long idleTimeout = 100L;
        MockSourceReader sourceReader = new MockSourceReader(MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        SourceOperator<Integer, MockSourceSplit> operator = this.createAndOpenSourceOperatorWithIdleness(sourceReader, processingTimeService, idleTimeout);
        MockSourceSplit split0 = new MockSourceSplit(0, 0, 1);
        MockSourceSplit split1 = new MockSourceSplit(1, 10, 20);
        int maxAllowedWatermark = 4;
        int maxEmittedWatermark = maxAllowedWatermark + 1;
        split0.addRecord(maxEmittedWatermark);
        split1.addRecord(3);
        operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Arrays.asList(split0, split1), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        while (operator.emitNext(dataOutput) == DataInputStatus.MORE_AVAILABLE) {
        }
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent((long)maxAllowedWatermark));
        Assertions.assertThat((Collection)sourceReader.getPausedSplits()).isEmpty();
    }

    @Test
    void testMetricGroupIsClosedForFinishedSplitAndMetricsAreUnregistered() throws Exception {
        long idleTimeout = 100L;
        List<String> expectedMetricNames = Arrays.asList("idleTimeMsPerSecond", "accumulatedIdleTimeMs", "activeTimeMsPerSecond", "accumulatedActiveTimeMs", "pausedTimeMsPerSecond", "accumulatedPausedTimeMs", "currentWatermark");
        ConcurrentHashMap<String, Metric> registry = new ConcurrentHashMap<String, Metric>();
        MockSourceReader sourceReader = new MockSourceReader(MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        SourceOperator<Integer, MockSourceSplit> operator = this.createAndOpenSourceOperatorWithIdlenessAndRegistry(sourceReader, processingTimeService, idleTimeout, registry);
        MockSourceSplit split0 = new MockSourceSplit(0, 0, 1);
        split0.addRecord(5);
        operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Arrays.asList(split0), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        AbstractMetricGroup metricGroup = (AbstractMetricGroup)operator.getSplitMetricGroup(split0.splitId()).getSplitWatermarkMetricGroup();
        expectedMetricNames.forEach(metric -> Assertions.assertThat((boolean)registry.containsKey(metric)).isTrue());
        while (operator.emitNext(dataOutput) == DataInputStatus.MORE_AVAILABLE) {
        }
        Assertions.assertThat((boolean)metricGroup.isClosed()).isTrue();
        expectedMetricNames.forEach(metric -> Assertions.assertThat((boolean)registry.containsKey(metric)).isFalse());
    }

    @Test
    void testStateReportingForMultiSplitWatermarkAlignmentAndIdleness() throws Exception {
        long idleTimeout = 100L;
        MockSourceReader sourceReader = new MockSourceReader(MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        SourceOperator<Integer, MockSourceSplit> operator = this.createAndOpenSourceOperatorWithIdleness(sourceReader, processingTimeService, idleTimeout);
        MockSourceSplit split0 = new MockSourceSplit(0, 0, 10);
        MockSourceSplit split1 = new MockSourceSplit(1, 10, 20);
        int allowedWatermark4 = 4;
        int allowedWatermark7 = 7;
        int allowedWatermark10 = 10;
        split0.addRecord(5);
        split1.addRecord(3);
        split0.addRecord(6);
        split1.addRecord(8);
        operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Arrays.asList(split0, split1), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split1.splitId()).isActive()).isTrue();
        operator.emitNext(dataOutput);
        operator.emitNext(dataOutput);
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent((long)allowedWatermark4));
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split1.splitId()).isActive()).isTrue();
        processingTimeService.advance(idleTimeout - 1L);
        operator.emitNext(dataOutput);
        for (int i = 0; i < 10; ++i) {
            processingTimeService.advance(idleTimeout);
        }
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split1.splitId()).isIdle()).isTrue();
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent((long)allowedWatermark7));
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split1.splitId()).isIdle()).isTrue();
        operator.emitNext(dataOutput);
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split1.splitId()).isPaused()).isTrue();
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent((long)allowedWatermark10));
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split1.splitId()).isActive()).isTrue();
    }

    @Test
    void testStateReportingForSingleSplitWatermarkAlignmentAndIdleness() throws Exception {
        int i;
        long idleTimeout = 100L;
        MockSourceReader sourceReader = new MockSourceReader(MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, true, true);
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        SourceOperator<Integer, MockSourceSplit> operator = this.createAndOpenSourceOperatorWithIdleness(sourceReader, processingTimeService, idleTimeout);
        MockSourceSplit split0 = new MockSourceSplit(0, 0, 10);
        int allowedWatermark4 = 4;
        int allowedWatermark5 = 5;
        int allowedWatermark7 = 7;
        split0.addRecord(5);
        split0.addRecord(6);
        split0.addRecord(7);
        operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Arrays.asList(split0), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput<Integer> actualOutput = new CollectingDataOutput<Integer>();
        operator.emitNext(actualOutput);
        this.assertOutput(actualOutput, Arrays.asList(5));
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent((long)allowedWatermark4));
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();
        for (i = 0; i < 10; ++i) {
            processingTimeService.advance(idleTimeout);
        }
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isIdle()).isFalse();
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent((long)allowedWatermark5));
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
        for (i = 0; i < 10; ++i) {
            processingTimeService.advance(idleTimeout);
        }
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();
        operator.emitNext(actualOutput);
        this.assertOutput(actualOutput, Arrays.asList(5, 6));
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isIdle()).isFalse();
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent((long)allowedWatermark7));
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
        for (i = 0; i < 10; ++i) {
            processingTimeService.advance(idleTimeout);
        }
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();
        operator.emitNext(actualOutput);
        this.assertOutput(actualOutput, Arrays.asList(5, 6, 7));
        Assertions.assertThat((Boolean)operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
    }

    private void assertOutput(CollectingDataOutput<Integer> actualOutput, List<Integer> expectedOutput) {
        Assertions.assertThat(actualOutput.getEvents().stream().filter(o -> o instanceof StreamRecord).mapToInt(object -> (Integer)((StreamRecord)object).getValue()).boxed().collect(Collectors.toList())).containsExactly((Object[])expectedOutput.toArray(new Integer[0]));
    }

    private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWithIdleness(MockSourceReader sourceReader, TestProcessingTimeService processingTimeService, long idleTimeout) throws Exception {
        return this.createAndOpenSourceOperatorWithIdlenessAndEnv(sourceReader, processingTimeService, idleTimeout, this.getTestingEnvironment());
    }

    private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWithIdlenessAndRegistry(MockSourceReader sourceReader, TestProcessingTimeService processingTimeService, long idleTimeout, Map<String, Metric> registry) throws Exception {
        StreamMockEnvironment env = this.getTestingEnvironment();
        TaskMetricGroup metricGroup = TaskManagerMetricGroup.createTaskManagerMetricGroup((MetricRegistry)new TestMetricRegistry(registry), (String)"localhost", (ResourceID)ResourceID.generate()).addJob(new JobID(), "jobName").addTask(ExecutionGraphTestUtils.createExecutionAttemptId(), "test");
        env.setTaskMetricGroup(metricGroup);
        return this.createAndOpenSourceOperatorWithIdlenessAndEnv(sourceReader, processingTimeService, idleTimeout, env);
    }

    private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWithIdlenessAndEnv(MockSourceReader sourceReader, TestProcessingTimeService processingTimeService, long idleTimeout, Environment env) throws Exception {
        TestingSourceOperator<Integer> operator = new TestingSourceOperator<Integer>((StreamOperatorParameters<Integer>)new StreamOperatorParameters((StreamTask)new SourceOperatorStreamTask(env), (StreamConfig)new MockStreamConfig(new Configuration(), 1), new MockOutput(new ArrayList()), () -> processingTimeService, null, null), (SourceReader<Integer, MockSourceSplit>)sourceReader, (WatermarkStrategy<Integer>)WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new TestWatermarkGenerator()).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(r, l) -> r.intValue()).withWatermarkAlignment("group-1", Duration.ofMillis(1L)).withIdleness(Duration.ofMillis(idleTimeout)), (ProcessingTimeService)processingTimeService, new MockOperatorEventGateway(), 1, 5, true);
        operator.initializeState((StreamTaskStateInitializer)new StreamTaskStateInitializerImpl(env, (StateBackend)new HashMapStateBackend()));
        operator.open();
        return operator;
    }

    private StreamMockEnvironment getTestingEnvironment() {
        return new StreamMockEnvironment(new Configuration(), new Configuration(), new ExecutionConfig(), 1L, new MockInputSplitProvider(), 1, new TestTaskStateManager());
    }

    static class TestMetricRegistry
    extends NoOpMetricRegistry {
        private final Map<String, Metric> metrics;

        TestMetricRegistry(Map<String, Metric> metrics) {
            this.metrics = metrics;
        }

        public void register(Metric metric, String metricName, AbstractMetricGroup<?> group) {
            this.metrics.put(metricName, metric);
        }

        public void unregister(Metric metric, String metricName, AbstractMetricGroup<?> group) {
            if (this.metrics.get(metricName) != null) {
                this.metrics.remove(metricName);
            }
        }
    }

    public static class AnyWatermark
    extends Condition<Object> {
        public AnyWatermark() {
            super(event -> event instanceof Watermark, "any watermark", new Object[0]);
        }
    }

    public static class WatermarkAbove
    extends Condition<Object> {
        public WatermarkAbove(int maxEmittedWatermark) {
            super(event -> {
                if (!(event instanceof Watermark)) {
                    return false;
                }
                Watermark w = (Watermark)event;
                return w.getTimestamp() > (long)maxEmittedWatermark;
            }, "watermark value of greater than %d", new Object[]{maxEmittedWatermark});
        }
    }

    private static class TestWatermarkGenerator
    implements WatermarkGenerator<Integer> {
        private long maxWatermark = Long.MIN_VALUE;

        private TestWatermarkGenerator() {
        }

        public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) {
            if (eventTimestamp > this.maxWatermark) {
                this.maxWatermark = eventTimestamp;
                output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(this.maxWatermark));
            }
        }

        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(this.maxWatermark));
        }
    }
}

