/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.CoordVector;
import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
import org.apache.flink.test.util.PointFormatter;
import org.apache.flink.test.util.PointInFormat;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;

public class IterationWithChainingITCase
extends JavaProgramTestBaseJUnit4 {
    private static final String DATA_POINTS = "0|50.90|16.20|72.08|\n1|73.65|61.76|62.89|\n2|61.73|49.95|92.74|\n";
    private String dataPath;
    private String resultPath;

    protected void preSubmit() throws Exception {
        this.dataPath = this.createTempFile("data_points.txt", DATA_POINTS);
        this.resultPath = this.getTempFilePath("result");
    }

    protected void testProgram() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        Operator initialInput = ((DataSource)env.readFile((FileInputFormat)new PointInFormat(), this.dataPath).setParallelism(1)).name("Input");
        IterativeDataSet iteration = (IterativeDataSet)initialInput.iterate(2).name("Loop");
        MapOperator identity = iteration.groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new GroupReduceFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>(){

            public void reduce(Iterable<Tuple2<Integer, CoordVector>> values, Collector<Tuple2<Integer, CoordVector>> out) throws Exception {
                for (Tuple2<Integer, CoordVector> value : values) {
                    out.collect(value);
                }
            }
        }).map((MapFunction)new MapFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>(){

            public Tuple2<Integer, CoordVector> map(Tuple2<Integer, CoordVector> value) throws Exception {
                return value;
            }
        });
        iteration.closeWith((DataSet)identity).writeAsFormattedText(this.resultPath, (TextOutputFormat.TextFormatter)new PointFormatter());
        env.execute("Iteration with chained map test");
        TestBaseUtils.compareResultsByLinesInMemory((String)DATA_POINTS, (String)this.resultPath);
    }
}

