/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.test.streaming.runtime.util.EvenOddOutputSelector;
import org.apache.flink.test.streaming.runtime.util.NoOpIntMap;
import org.apache.flink.test.streaming.runtime.util.ReceiveCheckNoOpSink;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MathUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IterateITCase
extends AbstractTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(IterateITCase.class);
    private static boolean[] iterated;
    private int parallelism = miniClusterResource.getNumberSlots();
    public static CoMapFunction<Integer, String, String> noOpCoMap;
    public static MapFunction<Integer, Integer> noOpIntMap;
    public static MapFunction<String, String> noOpStrMap;
    public static CoMapFunction<Integer, Integer, Integer> noOpIntCoMap;
    public static MapFunction<Boolean, Boolean> noOpBoolMap;

    @Test(expected=UnsupportedOperationException.class)
    public void testIncorrectParallelism() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromElements((Object[])new Integer[]{1, 10});
        IterativeStream iter1 = source.iterate();
        SingleOutputStreamOperator map1 = iter1.map(noOpIntMap);
        iter1.closeWith((DataStream)map1).print();
    }

    @Test
    public void testDoubleClosing() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 10}).map(noOpIntMap);
        IterativeStream iter1 = source.iterate();
        iter1.closeWith((DataStream)iter1.map(noOpIntMap));
        iter1.closeWith((DataStream)iter1.map(noOpIntMap));
    }

    @Test(expected=UnsupportedOperationException.class)
    public void testDifferingParallelism() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 10}).map(noOpIntMap);
        IterativeStream iter1 = source.iterate();
        iter1.closeWith((DataStream)iter1.map(noOpIntMap).setParallelism(this.parallelism / 2));
    }

    @Test(expected=UnsupportedOperationException.class)
    public void testCoDifferingParallelism() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 10}).map(noOpIntMap);
        IterativeStream.ConnectedIterativeStreams coIter = source.iterate().withFeedbackType(Integer.class);
        coIter.closeWith((DataStream)coIter.map(noOpIntCoMap).setParallelism(this.parallelism / 2));
    }

    @Test(expected=UnsupportedOperationException.class)
    public void testClosingFromOutOfLoop() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 10}).map(noOpIntMap);
        IterativeStream iter1 = source.iterate();
        IterativeStream iter2 = source.iterate();
        iter2.closeWith((DataStream)iter1.map(noOpIntMap));
    }

    @Test(expected=UnsupportedOperationException.class)
    public void testCoIterClosingFromOutOfLoop() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 10}).map(noOpIntMap);
        IterativeStream iter1 = source.iterate();
        IterativeStream.ConnectedIterativeStreams coIter = source.iterate().withFeedbackType(Integer.class);
        coIter.closeWith((DataStream)iter1.map(noOpIntMap));
    }

    @Test(expected=IllegalStateException.class)
    public void testExecutionWithEmptyIteration() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 10}).map(noOpIntMap);
        IterativeStream iter1 = source.iterate();
        iter1.map(noOpIntMap).print();
        env.execute();
    }

    @Test
    public void testImmutabilityWithCoiteration() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 10}).map(noOpIntMap);
        IterativeStream iter1 = source.iterate();
        IterativeStream.ConnectedIterativeStreams iter2 = iter1.withFeedbackType(String.class);
        iter1.closeWith((DataStream)iter1.map(noOpIntMap)).print();
        iter2.closeWith((DataStream)iter2.map(noOpCoMap)).print();
        StreamGraph graph = env.getStreamGraph();
        Assert.assertEquals((long)2L, (long)graph.getIterationSourceSinkPairs().size());
        for (Tuple2 sourceSinkPair : graph.getIterationSourceSinkPairs()) {
            Assert.assertEquals((Object)graph.getTargetVertex((StreamEdge)((StreamNode)sourceSinkPair.f0).getOutEdges().get(0)), (Object)graph.getSourceVertex((StreamEdge)((StreamNode)sourceSinkPair.f1).getInEdges().get(0)));
        }
    }

    @Test
    public void testmultipleHeadsTailsSimple() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source1 = env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5}).shuffle().map(noOpIntMap).name("ParallelizeMapShuffle");
        SingleOutputStreamOperator source2 = env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5}).map(noOpIntMap).name("ParallelizeMapRebalance");
        IterativeStream iter1 = source1.union(new DataStream[]{source2}).iterate();
        SingleOutputStreamOperator head1 = iter1.map(noOpIntMap).name("IterRebalanceMap").setParallelism(this.parallelism / 2);
        SingleOutputStreamOperator head2 = iter1.map(noOpIntMap).name("IterForwardMap");
        DataStreamSink head3 = iter1.map(noOpIntMap).setParallelism(this.parallelism / 2).addSink(new ReceiveCheckNoOpSink());
        DataStreamSink head4 = iter1.map(noOpIntMap).addSink(new ReceiveCheckNoOpSink());
        SplitStream source3 = env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5}).map(noOpIntMap).name("EvenOddSourceMap").split((OutputSelector)new EvenOddOutputSelector());
        iter1.closeWith(source3.select(new String[]{"even"}).union(new DataStream[]{head1.rebalance().map(noOpIntMap).broadcast(), head2.shuffle()}));
        StreamGraph graph = env.getStreamGraph();
        JobGraph jg = graph.getJobGraph();
        Assert.assertEquals((long)1L, (long)graph.getIterationSourceSinkPairs().size());
        Tuple2 sourceSinkPair = (Tuple2)graph.getIterationSourceSinkPairs().iterator().next();
        StreamNode itSource = (StreamNode)sourceSinkPair.f0;
        StreamNode itSink = (StreamNode)sourceSinkPair.f1;
        Assert.assertEquals((long)4L, (long)itSource.getOutEdges().size());
        Assert.assertEquals((long)3L, (long)itSink.getInEdges().size());
        Assert.assertEquals((long)itSource.getParallelism(), (long)itSink.getParallelism());
        for (StreamEdge edge : itSource.getOutEdges()) {
            if (graph.getTargetVertex(edge).getOperatorName().equals("IterRebalanceMap")) {
                Assert.assertTrue((boolean)(edge.getPartitioner() instanceof RebalancePartitioner));
                continue;
            }
            if (!graph.getTargetVertex(edge).getOperatorName().equals("IterForwardMap")) continue;
            Assert.assertTrue((boolean)(edge.getPartitioner() instanceof ForwardPartitioner));
        }
        for (StreamEdge edge : itSink.getInEdges()) {
            if (graph.getStreamNode(Integer.valueOf(edge.getSourceId())).getOperatorName().equals("ParallelizeMapShuffle")) {
                Assert.assertTrue((boolean)(edge.getPartitioner() instanceof ShufflePartitioner));
            }
            if (graph.getStreamNode(Integer.valueOf(edge.getSourceId())).getOperatorName().equals("ParallelizeMapForward")) {
                Assert.assertTrue((boolean)(edge.getPartitioner() instanceof ForwardPartitioner));
            }
            if (!graph.getStreamNode(Integer.valueOf(edge.getSourceId())).getOperatorName().equals("EvenOddSourceMap")) continue;
            Assert.assertTrue((boolean)(edge.getPartitioner() instanceof ForwardPartitioner));
            Assert.assertTrue((boolean)edge.getSelectedNames().contains("even"));
        }
        JobVertex itSource1 = null;
        JobVertex itSink1 = null;
        for (JobVertex vertex : jg.getVertices()) {
            if (vertex.getName().contains("IterationSource")) {
                itSource1 = vertex;
                continue;
            }
            if (!vertex.getName().contains("IterationSink")) continue;
            itSink1 = vertex;
        }
        Assert.assertTrue((itSource1.getCoLocationGroup() != null ? 1 : 0) != 0);
        Assert.assertEquals((Object)itSource1.getCoLocationGroup(), (Object)itSink1.getCoLocationGroup());
    }

    @Test
    public void testmultipleHeadsTailsWithTailPartitioning() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source1 = env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5}).shuffle().map(noOpIntMap);
        SingleOutputStreamOperator source2 = env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5}).map(noOpIntMap);
        IterativeStream iter1 = source1.union(new DataStream[]{source2}).iterate();
        SingleOutputStreamOperator head1 = iter1.map(noOpIntMap).name("map1");
        DataStream head2 = iter1.map(noOpIntMap).setParallelism(this.parallelism / 2).name("shuffle").rebalance();
        DataStreamSink head3 = iter1.map(noOpIntMap).setParallelism(this.parallelism / 2).addSink(new ReceiveCheckNoOpSink());
        DataStreamSink head4 = iter1.map(noOpIntMap).addSink(new ReceiveCheckNoOpSink());
        SplitStream source3 = env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5}).map(noOpIntMap).name("split").split((OutputSelector)new EvenOddOutputSelector());
        iter1.closeWith(source3.select(new String[]{"even"}).union(new DataStream[]{head1.map(noOpIntMap).name("bc").broadcast(), head2.map(noOpIntMap).shuffle()}));
        StreamGraph graph = env.getStreamGraph();
        JobGraph jg = graph.getJobGraph();
        Assert.assertEquals((long)1L, (long)graph.getIterationSourceSinkPairs().size());
        Tuple2 sourceSinkPair = (Tuple2)graph.getIterationSourceSinkPairs().iterator().next();
        StreamNode itSource = (StreamNode)sourceSinkPair.f0;
        StreamNode itSink = (StreamNode)sourceSinkPair.f1;
        Assert.assertEquals((long)4L, (long)itSource.getOutEdges().size());
        Assert.assertEquals((long)3L, (long)itSink.getInEdges().size());
        Assert.assertEquals((long)itSource.getParallelism(), (long)itSink.getParallelism());
        for (StreamEdge edge : itSource.getOutEdges()) {
            if (graph.getTargetVertex(edge).getOperatorName().equals("map1")) {
                Assert.assertTrue((boolean)(edge.getPartitioner() instanceof ForwardPartitioner));
                Assert.assertEquals((long)4L, (long)graph.getTargetVertex(edge).getParallelism());
                continue;
            }
            if (!graph.getTargetVertex(edge).getOperatorName().equals("shuffle")) continue;
            Assert.assertTrue((boolean)(edge.getPartitioner() instanceof RebalancePartitioner));
            Assert.assertEquals((long)2L, (long)graph.getTargetVertex(edge).getParallelism());
        }
        for (StreamEdge edge : itSink.getInEdges()) {
            String tailName = graph.getSourceVertex(edge).getOperatorName();
            if (tailName.equals("split")) {
                Assert.assertTrue((boolean)(edge.getPartitioner() instanceof ForwardPartitioner));
                Assert.assertTrue((boolean)edge.getSelectedNames().contains("even"));
                continue;
            }
            if (tailName.equals("bc")) {
                Assert.assertTrue((boolean)(edge.getPartitioner() instanceof BroadcastPartitioner));
                continue;
            }
            if (!tailName.equals("shuffle")) continue;
            Assert.assertTrue((boolean)(edge.getPartitioner() instanceof ShufflePartitioner));
        }
        JobVertex itSource1 = null;
        JobVertex itSink1 = null;
        for (JobVertex vertex : jg.getVertices()) {
            if (vertex.getName().contains("IterationSource")) {
                itSource1 = vertex;
                continue;
            }
            if (!vertex.getName().contains("IterationSink")) continue;
            itSink1 = vertex;
        }
        Assert.assertTrue((itSource1.getCoLocationGroup() != null ? 1 : 0) != 0);
        Assert.assertTrue((itSink1.getCoLocationGroup() != null ? 1 : 0) != 0);
        Assert.assertEquals((Object)itSource1.getCoLocationGroup(), (Object)itSink1.getCoLocationGroup());
    }

    @Test
    public void testSimpleIteration() throws Exception {
        int numRetries = 5;
        int timeoutScale = 1;
        for (int numRetry = 0; numRetry < numRetries; ++numRetry) {
            try {
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                iterated = new boolean[this.parallelism];
                SingleOutputStreamOperator source = env.fromCollection(Collections.nCopies(this.parallelism * 2, false)).map(noOpBoolMap).name("ParallelizeMap");
                IterativeStream iteration = source.iterate((long)(3000 * timeoutScale));
                SingleOutputStreamOperator increment = iteration.flatMap((FlatMapFunction)new IterationHead()).map(noOpBoolMap);
                iteration.map(noOpBoolMap).addSink(new ReceiveCheckNoOpSink());
                iteration.closeWith((DataStream)increment).addSink(new ReceiveCheckNoOpSink());
                env.execute();
                for (boolean iter : iterated) {
                    Assert.assertTrue((boolean)iter);
                }
                break;
            }
            catch (Throwable t) {
                LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
                if (numRetry >= numRetries - 1) {
                    throw t;
                }
                timeoutScale *= 2;
                continue;
            }
        }
    }

    @Test
    public void testCoIteration() throws Exception {
        int numRetries = 5;
        int timeoutScale = 1;
        for (int numRetry = 0; numRetry < numRetries; ++numRetry) {
            try {
                TestSink.collected = new ArrayList<String>();
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(2);
                SingleOutputStreamOperator otherSource = env.fromElements((Object[])new String[]{"1000", "2000"}).map(noOpStrMap).name("ParallelizeMap");
                IterativeStream.ConnectedIterativeStreams coIt = env.fromElements((Object[])new Integer[]{0, 0}).map(noOpIntMap).name("ParallelizeMap").iterate((long)(2000 * timeoutScale)).withFeedbackType(Types.STRING);
                try {
                    coIt.keyBy(1, 2);
                    Assert.fail();
                }
                catch (InvalidProgramException invalidProgramException) {
                    // empty catch block
                }
                SingleOutputStreamOperator head = coIt.flatMap((CoFlatMapFunction)new RichCoFlatMapFunction<Integer, String, String>(){
                    private static final long serialVersionUID = 1L;
                    boolean seenFromSource = false;

                    public void flatMap1(Integer value, Collector<String> out) throws Exception {
                        out.collect((Object)Integer.valueOf(value + 1).toString());
                    }

                    public void flatMap2(String value, Collector<String> out) throws Exception {
                        Integer intVal = Integer.valueOf(value);
                        if (intVal < 2) {
                            out.collect((Object)Integer.valueOf(intVal + 1).toString());
                        }
                        if (intVal == 1000 || intVal == 2000) {
                            this.seenFromSource = true;
                        }
                    }

                    public void close() {
                        Assert.assertTrue((boolean)this.seenFromSource);
                    }
                });
                coIt.map((CoMapFunction)new CoMapFunction<Integer, String, String>(){

                    public String map1(Integer value) throws Exception {
                        return value.toString();
                    }

                    public String map2(String value) throws Exception {
                        return value;
                    }
                }).addSink(new ReceiveCheckNoOpSink());
                coIt.closeWith(head.broadcast().union(new DataStream[]{otherSource}));
                head.addSink((SinkFunction)new TestSink()).setParallelism(1);
                Assert.assertEquals((long)1L, (long)env.getStreamGraph("Flink Streaming Job", false).getIterationSourceSinkPairs().size());
                env.execute();
                Collections.sort(TestSink.collected);
                Assert.assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected);
                break;
            }
            catch (Throwable t) {
                LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
                if (numRetry >= numRetries - 1) {
                    throw t;
                }
                timeoutScale *= 2;
                continue;
            }
        }
    }

    @Test
    public void testGroupByFeedback() throws Exception {
        int numRetries = 5;
        int timeoutScale = 1;
        for (int numRetry = 0; numRetry < numRetries; ++numRetry) {
            try {
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(this.parallelism - 1);
                env.getConfig().setMaxParallelism(env.getParallelism());
                KeySelector<Integer, Integer> key = new KeySelector<Integer, Integer>(){

                    public Integer getKey(Integer value) throws Exception {
                        return value % 3;
                    }
                };
                SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 2, 3}).map(noOpIntMap).name("ParallelizeMap");
                IterativeStream it = source.keyBy((KeySelector)key).iterate((long)(3000 * timeoutScale));
                SingleOutputStreamOperator head = it.flatMap((FlatMapFunction)new RichFlatMapFunction<Integer, Integer>(){
                    int received = 0;
                    int key = -1;

                    public void flatMap(Integer value, Collector<Integer> out) throws Exception {
                        ++this.received;
                        if (this.key == -1) {
                            this.key = MathUtils.murmurHash((int)(value % 3)) % 3;
                        } else {
                            Assert.assertEquals((long)this.key, (long)(MathUtils.murmurHash((int)(value % 3)) % 3));
                        }
                        if (value > 0) {
                            out.collect((Object)(value - 1));
                        }
                    }

                    public void close() {
                        Assert.assertTrue((this.received > 1 ? 1 : 0) != 0);
                    }
                });
                it.closeWith(head.keyBy((KeySelector)key).union(new DataStream[]{head.map(noOpIntMap).keyBy((KeySelector)key)})).addSink(new ReceiveCheckNoOpSink());
                env.execute();
                break;
            }
            catch (Throwable t) {
                LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
                if (numRetry >= numRetries - 1) {
                    throw t;
                }
                timeoutScale *= 2;
                continue;
            }
        }
    }

    @Test
    public void testWithCheckPointing() throws Exception {
        int numRetries = 5;
        int timeoutScale = 1;
        for (int numRetry = 0; numRetry < numRetries; ++numRetry) {
            try {
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                try {
                    this.createIteration(env, timeoutScale);
                    env.execute();
                    Assert.fail();
                }
                catch (UnsupportedOperationException unsupportedOperationException) {
                    // empty catch block
                }
                try {
                    this.createIteration(env, timeoutScale);
                    env.enableCheckpointing(10L, CheckpointingMode.EXACTLY_ONCE, false);
                    env.execute();
                    Assert.fail();
                }
                catch (UnsupportedOperationException unsupportedOperationException) {
                    // empty catch block
                }
                this.createIteration(env, timeoutScale);
                env.enableCheckpointing(10L, CheckpointingMode.EXACTLY_ONCE, true);
                env.getStreamGraph().getJobGraph();
                break;
            }
            catch (Throwable t) {
                LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
                if (numRetry >= numRetries - 1) {
                    throw t;
                }
                timeoutScale *= 2;
                continue;
            }
        }
    }

    private void createIteration(StreamExecutionEnvironment env, int timeoutScale) {
        env.enableCheckpointing();
        SingleOutputStreamOperator source = env.fromCollection(Collections.nCopies(this.parallelism * 2, false)).map(noOpBoolMap).name("ParallelizeMap");
        IterativeStream iteration = source.iterate((long)(3000 * timeoutScale));
        iteration.closeWith((DataStream)iteration.flatMap((FlatMapFunction)new IterationHead())).addSink(new ReceiveCheckNoOpSink());
    }

    static {
        noOpCoMap = new CoMapFunction<Integer, String, String>(){

            public String map1(Integer value) throws Exception {
                return value.toString();
            }

            public String map2(String value) throws Exception {
                return value;
            }
        };
        noOpIntMap = new NoOpIntMap();
        noOpStrMap = new MapFunction<String, String>(){

            public String map(String value) throws Exception {
                return value;
            }
        };
        noOpIntCoMap = new CoMapFunction<Integer, Integer, Integer>(){

            public Integer map1(Integer value) throws Exception {
                return value;
            }

            public Integer map2(Integer value) throws Exception {
                return value;
            }
        };
        noOpBoolMap = new MapFunction<Boolean, Boolean>(){

            public Boolean map(Boolean value) throws Exception {
                return value;
            }
        };
    }

    private static class TestSink
    implements SinkFunction<String> {
        private static final long serialVersionUID = 1L;
        public static List<String> collected = new ArrayList<String>();

        private TestSink() {
        }

        public void invoke(String value) throws Exception {
            collected.add(value);
        }
    }

    private static final class IterationHead
    extends RichFlatMapFunction<Boolean, Boolean> {
        private IterationHead() {
        }

        public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {
            int indx = this.getRuntimeContext().getIndexOfThisSubtask();
            if (value.booleanValue()) {
                iterated[indx] = true;
            } else {
                out.collect((Object)true);
            }
        }
    }
}

