/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class WindowFoldITCase
extends AbstractTestBase {
    private static List<String> testResults;

    @Test
    public void testFoldWindow() throws Exception {
        testResults = new ArrayList<String>();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        SingleOutputStreamOperator source1 = env.addSource((SourceFunction)new SourceFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)0));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)1));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)2));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)3));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)4));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)5));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)6));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)7));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)8));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new Tuple2TimestampExtractor());
        source1.keyBy(new int[]{0}).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.MILLISECONDS))).fold((Object)Tuple2.of((Object)"R:", (Object)0), (FoldFunction)new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>(){

            public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
                accumulator.f0 = accumulator.f0 + (String)value.f0;
                Tuple2<String, Integer> tuple2 = accumulator;
                tuple2.f1 = (Integer)tuple2.f1 + (Integer)value.f1;
                return accumulator;
            }
        }).addSink((SinkFunction)new SinkFunction<Tuple2<String, Integer>>(){

            public void invoke(Tuple2<String, Integer> value) throws Exception {
                testResults.add(value.toString());
            }
        });
        env.execute("Fold Window Test");
        List<String> expectedResult = Arrays.asList("(R:aaa,3)", "(R:aaa,21)", "(R:bbb,12)");
        Collections.sort(expectedResult);
        Collections.sort(testResults);
        Assert.assertEquals(expectedResult, testResults);
    }

    @Test
    public void testFoldProcessWindow() throws Exception {
        testResults = new ArrayList<String>();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        SingleOutputStreamOperator source1 = env.addSource((SourceFunction)new SourceFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)0));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)1));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)2));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)3));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)4));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)5));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)6));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)7));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)8));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new Tuple2TimestampExtractor());
        source1.keyBy(new int[]{0}).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.MILLISECONDS))).fold((Object)Tuple2.of((Object)0, (Object)"R:"), (FoldFunction)new FoldFunction<Tuple2<String, Integer>, Tuple2<Integer, String>>(){

            public Tuple2<Integer, String> fold(Tuple2<Integer, String> accumulator, Tuple2<String, Integer> value) throws Exception {
                accumulator.f1 = accumulator.f1 + (String)value.f0;
                Tuple2<Integer, String> tuple2 = accumulator;
                tuple2.f0 = (Integer)tuple2.f0 + (Integer)value.f1;
                return accumulator;
            }
        }, (ProcessWindowFunction)new ProcessWindowFunction<Tuple2<Integer, String>, Tuple3<String, Integer, Integer>, Tuple, TimeWindow>(){

            public void process(Tuple tuple, ProcessWindowFunction.Context context, Iterable<Tuple2<Integer, String>> elements, Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
                int i = 0;
                for (Tuple2<Integer, String> in : elements) {
                    out.collect((Object)new Tuple3(in.f1, in.f0, (Object)i++));
                }
            }
        }).addSink((SinkFunction)new SinkFunction<Tuple3<String, Integer, Integer>>(){

            public void invoke(Tuple3<String, Integer, Integer> value) throws Exception {
                testResults.add(value.toString());
            }
        });
        env.execute("Fold Process Window Test");
        List<String> expectedResult = Arrays.asList("(R:aaa,3,0)", "(R:aaa,21,0)", "(R:bbb,12,0)");
        Collections.sort(expectedResult);
        Collections.sort(testResults);
        Assert.assertEquals(expectedResult, testResults);
    }

    @Test
    public void testFoldAllWindow() throws Exception {
        testResults = new ArrayList<String>();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        SingleOutputStreamOperator source1 = env.addSource((SourceFunction)new SourceFunction<Tuple2<String, Integer>>(){

            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)0));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)1));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)2));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)3));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)3));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)4));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)4));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)5));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)5));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new Tuple2TimestampExtractor());
        source1.windowAll((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.MILLISECONDS))).fold((Object)Tuple2.of((Object)"R:", (Object)0), (FoldFunction)new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>(){

            public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
                accumulator.f0 = accumulator.f0 + (String)value.f0;
                Tuple2<String, Integer> tuple2 = accumulator;
                tuple2.f1 = (Integer)tuple2.f1 + (Integer)value.f1;
                return accumulator;
            }
        }).addSink((SinkFunction)new SinkFunction<Tuple2<String, Integer>>(){

            public void invoke(Tuple2<String, Integer> value) throws Exception {
                testResults.add(value.toString());
            }
        });
        env.execute("Fold All-Window Test");
        List<String> expectedResult = Arrays.asList("(R:aaa,3)", "(R:bababa,24)");
        Collections.sort(expectedResult);
        Collections.sort(testResults);
        Assert.assertEquals(expectedResult, testResults);
    }

    @Test
    public void testFoldProcessAllWindow() throws Exception {
        testResults = new ArrayList<String>();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        SingleOutputStreamOperator source1 = env.addSource((SourceFunction)new SourceFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)0));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)1));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)2));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)3));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)4));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)5));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)6));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)7));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)8));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new Tuple2TimestampExtractor());
        source1.windowAll((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.MILLISECONDS))).fold((Object)Tuple2.of((Object)0, (Object)"R:"), (FoldFunction)new FoldFunction<Tuple2<String, Integer>, Tuple2<Integer, String>>(){

            public Tuple2<Integer, String> fold(Tuple2<Integer, String> accumulator, Tuple2<String, Integer> value) throws Exception {
                accumulator.f1 = accumulator.f1 + (String)value.f0;
                Tuple2<Integer, String> tuple2 = accumulator;
                tuple2.f0 = (Integer)tuple2.f0 + (Integer)value.f1;
                return accumulator;
            }
        }, (ProcessAllWindowFunction)new ProcessAllWindowFunction<Tuple2<Integer, String>, Tuple3<String, Integer, Integer>, TimeWindow>(){

            public void process(ProcessAllWindowFunction.Context context, Iterable<Tuple2<Integer, String>> elements, Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
                int i = 0;
                for (Tuple2<Integer, String> in : elements) {
                    out.collect((Object)new Tuple3(in.f1, in.f0, (Object)i++));
                }
            }
        }).addSink((SinkFunction)new SinkFunction<Tuple3<String, Integer, Integer>>(){

            public void invoke(Tuple3<String, Integer, Integer> value) throws Exception {
                testResults.add(value.toString());
            }
        });
        env.execute("Fold Process Window Test");
        List<String> expectedResult = Arrays.asList("(R:aaa,3,0)", "(R:aaa,21,0)", "(R:bbb,12,0)");
        Collections.sort(expectedResult);
        Collections.sort(testResults);
        Assert.assertEquals(expectedResult, testResults);
    }

    private static class Tuple2TimestampExtractor
    implements AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> {
        private Tuple2TimestampExtractor() {
        }

        public long extractTimestamp(Tuple2<String, Integer> element, long previousTimestamp) {
            return ((Integer)element.f1).intValue();
        }

        public Watermark checkAndGetNextWatermark(Tuple2<String, Integer> lastElement, long extractedTimestamp) {
            return new Watermark((long)((Integer)lastElement.f1 - 1));
        }
    }
}

