/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative;

import java.io.BufferedReader;
import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.test.iterative.ConnectedComponentsITCase;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ConnectedComponentsWithDeferredUpdateITCase
extends JavaProgramTestBaseJUnit4 {
    private static final long SEED = 3287269182979823L;
    private static final int NUM_VERTICES = 1000;
    private static final int NUM_EDGES = 10000;
    private final boolean extraMapper;
    protected String verticesPath;
    protected String edgesPath;
    protected String resultPath;

    public ConnectedComponentsWithDeferredUpdateITCase(boolean extraMapper) {
        this.extraMapper = extraMapper;
    }

    protected void preSubmit() throws Exception {
        this.verticesPath = this.createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices((int)1000));
        this.edgesPath = this.createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges((int)10000, (int)1000, (long)3287269182979823L));
        this.resultPath = this.getTempFilePath("results");
    }

    protected void testProgram() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource vertices = env.readCsvFile(this.verticesPath).types(Long.class);
        FlatMapOperator edges = env.readCsvFile(this.edgesPath).fieldDelimiter(" ").types(Long.class, Long.class).flatMap((FlatMapFunction)new ConnectedComponents.UndirectEdge());
        MapOperator verticesWithInitialId = vertices.map(new ConnectedComponentsITCase.DuplicateValue());
        DeltaIteration iteration = verticesWithInitialId.iterateDelta((DataSet)verticesWithInitialId, 100, new int[]{0});
        JoinOperator.EquiJoin changes = iteration.getWorkset().join((DataSet)edges).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new ConnectedComponents.NeighborWithComponentIDJoin()).groupBy(new int[]{0}).aggregate(Aggregations.MIN, 1).join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with((FlatJoinFunction)new UpdateComponentIdMatchNonPreserving());
        Object delta = this.extraMapper ? changes.map((MapFunction)new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>(){
            private static final long serialVersionUID = -3929364091829757322L;

            public Tuple2<Long, Long> map(Tuple2<Long, Long> v) throws Exception {
                return v;
            }
        }) : changes;
        DataSet result = iteration.closeWith((DataSet)delta, (DataSet)changes);
        result.writeAsCsv(this.resultPath, "\n", " ");
        env.execute("Connected Components Example");
    }

    protected void postSubmit() throws Exception {
        for (BufferedReader reader : TestBaseUtils.getResultReader((String)this.resultPath)) {
            ConnectedComponentsData.checkOddEvenResult((BufferedReader)reader);
        }
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        return Arrays.asList({false}, {true});
    }

    private static final class UpdateComponentIdMatchNonPreserving
    implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;

        private UpdateComponentIdMatchNonPreserving() {
        }

        public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> current, Collector<Tuple2<Long, Long>> out) throws Exception {
            if ((Long)candidate.f1 < (Long)current.f1) {
                out.collect(candidate);
            }
        }
    }
}

