/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.util.Collector;
import org.junit.Test;

public class DataStreamPojoITCase
extends AbstractTestBaseJUnit4 {
    static List<Data> elements = new ArrayList<Data>();

    @Test
    public void testCompositeKeyOnNestedPojo() throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.getConfig().disableObjectReuse();
        see.setParallelism(3);
        DataStreamSource dataStream = see.fromData(elements);
        SingleOutputStreamOperator summedStream = dataStream.keyBy(new String[]{"aaa", "abc", "wxyz"}).sum("sum").keyBy(new String[]{"aaa", "abc", "wxyz"}).flatMap((FlatMapFunction)new FlatMapFunction<Data, Data>(){
            private static final long serialVersionUID = 788865239171396315L;
            Data[] first = new Data[3];

            public void flatMap(Data value, Collector<Data> out) throws Exception {
                if (this.first[value.aaa] == null) {
                    this.first[value.aaa] = value;
                    if (value.sum != 1) {
                        throw new RuntimeException("Expected the sum to be one");
                    }
                } else {
                    if (value.sum != 2) {
                        throw new RuntimeException("Expected the sum to be two");
                    }
                    if (this.first[value.aaa].aaa != value.aaa) {
                        throw new RuntimeException("aaa key wrong");
                    }
                    if (this.first[value.aaa].abc != value.abc) {
                        throw new RuntimeException("abc key wrong");
                    }
                    if (this.first[value.aaa].wxyz != value.wxyz) {
                        throw new RuntimeException("wxyz key wrong");
                    }
                }
            }
        });
        summedStream.print();
        see.execute();
    }

    @Test
    public void testNestedKeyOnNestedPojo() throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.getConfig().disableObjectReuse();
        see.setParallelism(4);
        DataStreamSource dataStream = see.fromData(elements);
        SingleOutputStreamOperator summedStream = dataStream.keyBy(new String[]{"aaa", "stats.count"}).sum("sum").keyBy(new String[]{"aaa", "stats.count"}).flatMap((FlatMapFunction)new FlatMapFunction<Data, Data>(){
            private static final long serialVersionUID = -3678267280397950258L;
            Data[] first = new Data[3];

            public void flatMap(Data value, Collector<Data> out) throws Exception {
                if (value.stats.count != 123L) {
                    throw new RuntimeException("Wrong value for value.stats.count");
                }
                if (this.first[value.aaa] == null) {
                    this.first[value.aaa] = value;
                    if (value.sum != 1) {
                        throw new RuntimeException("Expected the sum to be one");
                    }
                } else {
                    if (value.sum != 2) {
                        throw new RuntimeException("Expected the sum to be two");
                    }
                    if (this.first[value.aaa].aaa != value.aaa) {
                        throw new RuntimeException("aaa key wrong");
                    }
                    if (this.first[value.aaa].abc != value.abc) {
                        throw new RuntimeException("abc key wrong");
                    }
                    if (this.first[value.aaa].wxyz != value.wxyz) {
                        throw new RuntimeException("wxyz key wrong");
                    }
                }
            }
        });
        summedStream.print();
        see.execute();
    }

    @Test
    public void testNestedPojoFieldAccessor() throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.getConfig().disableObjectReuse();
        see.setParallelism(4);
        DataStreamSource dataStream = see.fromData(elements);
        SingleOutputStreamOperator summedStream = dataStream.keyBy(new String[]{"aaa"}).sum("stats.count").keyBy(new String[]{"aaa"}).flatMap((FlatMapFunction)new FlatMapFunction<Data, Data>(){
            Data[] first = new Data[3];

            public void flatMap(Data value, Collector<Data> out) throws Exception {
                if (this.first[value.aaa] == null) {
                    this.first[value.aaa] = value;
                    if (value.stats.count != 123L) {
                        throw new RuntimeException("Expected stats.count to be 123");
                    }
                } else if (value.stats.count != 246L) {
                    throw new RuntimeException("Expected stats.count to be 2 * 123");
                }
            }
        });
        summedStream.print();
        see.execute();
    }

    @Test(expected=CompositeType.InvalidFieldReferenceException.class)
    public void testFailOnNestedPojoFieldAccessor() throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource dataStream = see.fromData(elements);
        dataStream.keyBy(new String[]{"aaa", "stats.count"}).sum("stats.nonExistingField");
    }

    static {
        elements.add(new Data(0, 0, 0));
        elements.add(new Data(0, 0, 0));
        elements.add(new Data(1, 1, 1));
        elements.add(new Data(1, 1, 1));
        elements.add(new Data(2, 2, 3));
        elements.add(new Data(2, 2, 3));
    }

    public static class Stats {
        public long count;
        public float a;
        public float b;
        public float c;
        public float d;
        public float e;
    }

    public static class Policy {
        public short a;
        public short b;
        public boolean c;
        public boolean d;
    }

    public static class Data {
        public int sum;
        public int aaa;
        public int abc;
        public long wxyz;
        public int t1;
        public int t2;
        public Policy policy;
        public Stats stats;

        public Data() {
        }

        public Data(int aaa, int abc, int wxyz) {
            this.sum = 1;
            this.aaa = aaa;
            this.abc = abc;
            this.wxyz = wxyz;
            this.stats = new Stats();
            this.stats.count = 123L;
        }

        public String toString() {
            return "Data{sum=" + this.sum + ", aaa=" + this.aaa + ", abc=" + this.abc + ", wxyz=" + this.wxyz + '}';
        }
    }
}

