/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.api;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MathUtils;
import org.junit.Assert;
import org.junit.Test;

public class StreamingOperatorsITCase
extends AbstractTestBase {
    @Test
    public void testGroupedFoldOperation() throws Exception {
        int numElements = 10;
        int numKeys = 2;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource sourceStream = env.addSource((SourceFunction)new TupleSource(numElements, 2));
        SplitStream splittedResult = sourceStream.keyBy(new int[]{0}).fold((Object)0, (FoldFunction)new FoldFunction<Tuple2<Integer, Integer>, Integer>(){
            private static final long serialVersionUID = 4875723041825726082L;

            public Integer fold(Integer accumulator, Tuple2<Integer, Integer> value) throws Exception {
                return accumulator + (Integer)value.f1;
            }
        }).map((MapFunction)new RichMapFunction<Integer, Tuple2<Integer, Integer>>(){
            private static final long serialVersionUID = 8538355101606319744L;
            int key = -1;

            public Tuple2<Integer, Integer> map(Integer value) throws Exception {
                if (this.key == -1) {
                    this.key = MathUtils.murmurHash((int)value) % 2;
                }
                return new Tuple2((Object)this.key, (Object)value);
            }
        }).split((OutputSelector)new OutputSelector<Tuple2<Integer, Integer>>(){
            private static final long serialVersionUID = -8439325199163362470L;

            public Iterable<String> select(Tuple2<Integer, Integer> value) {
                ArrayList<String> output = new ArrayList<String>();
                output.add(value.f0 + "");
                return output;
            }
        });
        MemorySinkFunction sinkFunction1 = new MemorySinkFunction(0);
        ArrayList<Integer> actualResult1 = new ArrayList<Integer>();
        MemorySinkFunction.registerCollection(0, actualResult1);
        splittedResult.select(new String[]{"0"}).map((MapFunction)new MapFunction<Tuple2<Integer, Integer>, Integer>(){
            private static final long serialVersionUID = 2114608668010092995L;

            public Integer map(Tuple2<Integer, Integer> value) throws Exception {
                return (Integer)value.f1;
            }
        }).addSink((SinkFunction)sinkFunction1);
        MemorySinkFunction sinkFunction2 = new MemorySinkFunction(1);
        ArrayList<Integer> actualResult2 = new ArrayList<Integer>();
        MemorySinkFunction.registerCollection(1, actualResult2);
        splittedResult.select(new String[]{"1"}).map((MapFunction)new MapFunction<Tuple2<Integer, Integer>, Integer>(){
            private static final long serialVersionUID = 5631104389744681308L;

            public Integer map(Tuple2<Integer, Integer> value) throws Exception {
                return (Integer)value.f1;
            }
        }).addSink((SinkFunction)sinkFunction2);
        ArrayList<Integer> expected1 = new ArrayList<Integer>(10);
        ArrayList<Integer> expected2 = new ArrayList<Integer>(10);
        int counter1 = 0;
        int counter2 = 0;
        for (int i = 0; i < numElements; ++i) {
            if (MathUtils.murmurHash((int)i) % 2 == 0) {
                expected1.add(counter1 += i);
                continue;
            }
            expected2.add(counter2 += i);
        }
        env.execute();
        Collections.sort(actualResult1);
        Collections.sort(actualResult2);
        Assert.assertEquals(expected1, actualResult1);
        Assert.assertEquals(expected2, actualResult2);
        MemorySinkFunction.clear();
    }

    @Test
    public void testFoldOperationWithNonJavaSerializableType() throws Exception {
        int numElements = 10;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.addSource((SourceFunction)new NonSerializableTupleSource(10));
        MemorySinkFunction sinkFunction = new MemorySinkFunction(0);
        ArrayList<Integer> actualResult = new ArrayList<Integer>();
        MemorySinkFunction.registerCollection(0, actualResult);
        input.keyBy(new int[]{0}).fold((Object)new NonSerializable(42), (FoldFunction)new FoldFunction<Tuple2<Integer, NonSerializable>, NonSerializable>(){
            private static final long serialVersionUID = 2705497830143608897L;

            public NonSerializable fold(NonSerializable accumulator, Tuple2<Integer, NonSerializable> value) throws Exception {
                return new NonSerializable(accumulator.value + ((NonSerializable)value.f1).value);
            }
        }).map((MapFunction)new MapFunction<NonSerializable, Integer>(){
            private static final long serialVersionUID = 6906984044674568945L;

            public Integer map(NonSerializable value) throws Exception {
                return value.value;
            }
        }).addSink((SinkFunction)sinkFunction);
        ArrayList<Integer> expected = new ArrayList<Integer>(10);
        for (int i = 0; i < 10; ++i) {
            expected.add(42 + i);
        }
        env.execute();
        Collections.sort(actualResult);
        Assert.assertEquals(expected, actualResult);
        MemorySinkFunction.clear();
    }

    @Test
    public void testAsyncWaitOperator() throws Exception {
        int numElements = 5;
        long timeout = 1000L;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.addSource((SourceFunction)new NonSerializableTupleSource(5));
        RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer> function = new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>(){
            private static final long serialVersionUID = 7000343199829487985L;
            transient ExecutorService executorService;

            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                this.executorService = Executors.newFixedThreadPool(5);
            }

            public void close() throws Exception {
                super.close();
                this.executorService.shutdownNow();
            }

            public void asyncInvoke(final Tuple2<Integer, NonSerializable> input, final ResultFuture<Integer> resultFuture) throws Exception {
                this.executorService.submit(new Runnable(){

                    @Override
                    public void run() {
                        resultFuture.complete(Collections.singletonList((Integer)input.f0 + (Integer)input.f0));
                    }
                });
            }
        };
        SingleOutputStreamOperator orderedResult = AsyncDataStream.orderedWait((DataStream)input, (AsyncFunction)function, (long)1000L, (TimeUnit)TimeUnit.MILLISECONDS, (int)2).setParallelism(1);
        MemorySinkFunction sinkFunction1 = new MemorySinkFunction(0);
        ArrayList<Integer> actualResult1 = new ArrayList<Integer>(5);
        MemorySinkFunction.registerCollection(0, actualResult1);
        orderedResult.addSink((SinkFunction)sinkFunction1).setParallelism(1);
        SingleOutputStreamOperator unorderedResult = AsyncDataStream.unorderedWait((DataStream)input, (AsyncFunction)function, (long)1000L, (TimeUnit)TimeUnit.MILLISECONDS, (int)2);
        MemorySinkFunction sinkFunction2 = new MemorySinkFunction(1);
        ArrayList<Integer> actualResult2 = new ArrayList<Integer>(5);
        MemorySinkFunction.registerCollection(1, actualResult2);
        unorderedResult.addSink((SinkFunction)sinkFunction2);
        ArrayList<Integer> expected = new ArrayList<Integer>(10);
        for (int i = 0; i < 5; ++i) {
            expected.add(i + i);
        }
        env.execute();
        Assert.assertEquals(expected, actualResult1);
        Collections.sort(actualResult2);
        Assert.assertEquals(expected, actualResult2);
        MemorySinkFunction.clear();
    }

    @Test
    public void testOperatorChainWithObjectReuseAndNoOutputOperators() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        DataStreamSource input = env.fromElements((Object[])new Integer[]{1, 2, 3});
        input.flatMap((FlatMapFunction)new FlatMapFunction<Integer, Integer>(){

            public void flatMap(Integer value, Collector<Integer> out) throws Exception {
                out.collect((Object)(value << 1));
            }
        });
        env.execute();
    }

    private static class MemorySinkFunction
    implements SinkFunction<Integer> {
        private static Map<Integer, Collection<Integer>> collections = new ConcurrentHashMap<Integer, Collection<Integer>>();
        private static final long serialVersionUID = -8815570195074103860L;
        private final int key;

        public MemorySinkFunction(int key) {
            this.key = key;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke(Integer value) throws Exception {
            Collection<Integer> collection;
            Collection<Integer> collection2 = collection = collections.get(this.key);
            synchronized (collection2) {
                collection.add(value);
            }
        }

        public static void registerCollection(int key, Collection<Integer> collection) {
            collections.put(key, collection);
        }

        public static void clear() {
            collections.clear();
        }
    }

    private static class TupleSource
    implements SourceFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = -8110466235852024821L;
        private final int numElements;
        private final int numKeys;

        public TupleSource(int numElements, int numKeys) {
            this.numElements = numElements;
            this.numKeys = numKeys;
        }

        public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
            for (int i = 0; i < this.numElements; ++i) {
                Tuple2 result = new Tuple2((Object)(1 + MathUtils.murmurHash((int)i) % this.numKeys), (Object)i);
                ctx.collect((Object)result);
            }
        }

        public void cancel() {
        }
    }

    private static class NonSerializableTupleSource
    implements SourceFunction<Tuple2<Integer, NonSerializable>> {
        private static final long serialVersionUID = 3949171986015451520L;
        private final int numElements;

        public NonSerializableTupleSource(int numElements) {
            this.numElements = numElements;
        }

        public void run(SourceFunction.SourceContext<Tuple2<Integer, NonSerializable>> ctx) throws Exception {
            for (int i = 0; i < this.numElements; ++i) {
                ctx.collect((Object)new Tuple2((Object)i, (Object)new NonSerializable(i)));
            }
        }

        public void cancel() {
        }
    }

    private static class NonSerializable {
        private final Object obj = new Object();
        private final int value;

        public NonSerializable(int value) {
            this.value = value;
        }
    }
}

