/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.BlockingSourceContext;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class StatefulSequenceSourceTest {
    StatefulSequenceSourceTest() {
    }

    @Test
    void testCheckpointRestore() throws Exception {
        boolean initElement = false;
        int maxElement = 100;
        int maxParallelsim = 2;
        HashSet<Long> expectedOutput = new HashSet<Long>();
        for (long i = 0L; i <= 100L; ++i) {
            expectedOutput.add(i);
        }
        final ConcurrentHashMap outputCollector = new ConcurrentHashMap();
        final OneShotLatch latchToTrigger1 = new OneShotLatch();
        final OneShotLatch latchToWait1 = new OneShotLatch();
        final OneShotLatch latchToTrigger2 = new OneShotLatch();
        final OneShotLatch latchToWait2 = new OneShotLatch();
        final StatefulSequenceSource source1 = new StatefulSequenceSource(0L, 100L);
        StreamSource src1 = new StreamSource((SourceFunction)source1);
        AbstractStreamOperatorTestHarness testHarness1 = new AbstractStreamOperatorTestHarness(src1, 2, 2, 0);
        testHarness1.open();
        final StatefulSequenceSource source2 = new StatefulSequenceSource(0L, 100L);
        StreamSource src2 = new StreamSource((SourceFunction)source2);
        AbstractStreamOperatorTestHarness testHarness2 = new AbstractStreamOperatorTestHarness(src2, 2, 2, 1);
        testHarness2.open();
        CheckedThread runner1 = new CheckedThread(){

            public void go() throws Exception {
                source1.run(new BlockingSourceContext("1", latchToTrigger1, latchToWait1, outputCollector, 21));
            }
        };
        CheckedThread runner2 = new CheckedThread(){

            public void go() throws Exception {
                source2.run(new BlockingSourceContext("2", latchToTrigger2, latchToWait2, outputCollector, 32));
            }
        };
        runner1.start();
        runner2.start();
        if (!latchToTrigger1.isTriggered()) {
            latchToTrigger1.await();
        }
        if (!latchToTrigger2.isTriggered()) {
            latchToTrigger2.await();
        }
        OperatorSubtaskState snapshot = AbstractStreamOperatorTestHarness.repackageState(testHarness1.snapshot(0L, 0L), testHarness2.snapshot(0L, 0L));
        final StatefulSequenceSource source3 = new StatefulSequenceSource(0L, 100L);
        StreamSource src3 = new StreamSource((SourceFunction)source3);
        OperatorSubtaskState initState = AbstractStreamOperatorTestHarness.repartitionOperatorState(snapshot, 2, 2, 1, 0);
        AbstractStreamOperatorTestHarness testHarness3 = new AbstractStreamOperatorTestHarness(src3, 2, 1, 0);
        testHarness3.setup();
        testHarness3.initializeState(initState);
        testHarness3.open();
        final OneShotLatch latchToTrigger3 = new OneShotLatch();
        final OneShotLatch latchToWait3 = new OneShotLatch();
        latchToWait3.trigger();
        CheckedThread runner3 = new CheckedThread(){

            public void go() throws Exception {
                source3.run(new BlockingSourceContext("3", latchToTrigger3, latchToWait3, outputCollector, 3));
            }
        };
        runner3.start();
        runner3.sync();
        Assertions.assertThat(outputCollector).hasSize(3);
        HashSet<Long> dedupRes = new HashSet<Long>(Math.abs(100) + 1);
        for (Map.Entry elementsPerTask : outputCollector.entrySet()) {
            String key = (String)elementsPerTask.getKey();
            List elements = (List)outputCollector.get(key);
            Assertions.assertThat((List)elements).isNotEmpty();
            for (Long elem : elements) {
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)dedupRes.add(elem)).as("Duplicate entry: " + elem, new Object[0])).isTrue();
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)expectedOutput.contains(elem)).as("Unexpected element: " + elem, new Object[0])).isTrue();
            }
        }
        Assertions.assertThat(dedupRes).hasSize(Math.abs(-100) + 1);
        latchToWait1.trigger();
        latchToWait2.trigger();
        runner1.sync();
        runner2.sync();
    }
}

