/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;

public class DependencyConnectedComponentsITCase
extends JavaProgramTestBaseJUnit4 {
    private static final int MAX_ITERATIONS = 20;
    private static final int parallelism = 1;
    protected static List<Tuple2<Long, Long>> verticesInput = new ArrayList<Tuple2<Long, Long>>();
    protected static List<Tuple2<Long, Long>> edgesInput = new ArrayList<Tuple2<Long, Long>>();
    private String resultPath;
    private String expectedResult;

    protected void preSubmit() throws Exception {
        verticesInput.clear();
        edgesInput.clear();
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)1L, (Object)1L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)2L, (Object)2L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)3L, (Object)3L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)4L, (Object)4L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)5L, (Object)5L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)6L, (Object)6L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)7L, (Object)7L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)8L, (Object)8L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)9L, (Object)9L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)1L, (Object)2L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)1L, (Object)3L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)2L, (Object)3L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)2L, (Object)4L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)2L, (Object)1L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)3L, (Object)1L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)3L, (Object)2L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)4L, (Object)2L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)4L, (Object)6L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)5L, (Object)6L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)6L, (Object)4L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)6L, (Object)5L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)7L, (Object)8L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)7L, (Object)9L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)8L, (Object)7L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)8L, (Object)9L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)9L, (Object)7L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)9L, (Object)8L));
        this.resultPath = this.getTempDirPath("result");
        this.expectedResult = "(1,1)\n(2,1)\n(3,1)\n(4,1)\n(5,1)\n(6,1)\n(7,7)\n(8,7)\n(9,7)\n";
    }

    protected void testProgram() throws Exception {
        DependencyConnectedComponentsProgram.runProgram(this.resultPath);
    }

    protected void postSubmit() throws Exception {
        TestBaseUtils.compareResultsByLinesInMemory((String)this.expectedResult, (String)this.resultPath);
    }

    private static final class MinimumIdFilter
    extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;

        private MinimumIdFilter() {
        }

        public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> vertexWithNewAndOldId, Collector<Tuple2<Long, Long>> out) {
            if ((Long)((Tuple2)vertexWithNewAndOldId.f0).f1 < (Long)((Tuple2)vertexWithNewAndOldId.f1).f1) {
                out.collect(vertexWithNewAndOldId.f0);
            }
        }
    }

    private static final class MinimumReduce
    extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;
        final Tuple2<Long, Long> resultVertex = new Tuple2();

        private MinimumReduce() {
        }

        public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
            Long vertexId = 0L;
            Long minimumCompId = Long.MAX_VALUE;
            for (Tuple2<Long, Long> value : values) {
                vertexId = (Long)value.f0;
                Long candidateCompId = (Long)value.f1;
                if (candidateCompId >= minimumCompId) continue;
                minimumCompId = candidateCompId;
            }
            this.resultVertex.f0 = vertexId;
            this.resultVertex.f1 = minimumCompId;
            out.collect(this.resultVertex);
        }
    }

    private static final class NeighborWithComponentIDJoin
    extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;

        private NeighborWithComponentIDJoin() {
        }

        public Tuple2<Long, Long> join(Tuple2<Long, Long> edge, Tuple2<Long, Long> vertexWithCompId) throws Exception {
            vertexWithCompId.setField(edge.f1, 0);
            return vertexWithCompId;
        }
    }

    private static final class FindCandidatesDependenciesJoin
    extends RichJoinFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;

        private FindCandidatesDependenciesJoin() {
        }

        public Tuple2<Long, Long> join(Long candidateId, Tuple2<Long, Long> edge) throws Exception {
            return edge;
        }
    }

    private static final class RemoveDuplicatesReduce
    extends RichGroupReduceFunction<Long, Long> {
        private static final long serialVersionUID = 1L;

        private RemoveDuplicatesReduce() {
        }

        public void reduce(Iterable<Long> values, Collector<Long> out) {
            out.collect((Object)values.iterator().next());
        }
    }

    private static final class FindCandidatesJoin
    extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> {
        private static final long serialVersionUID = 1L;

        private FindCandidatesJoin() {
        }

        public Long join(Tuple2<Long, Long> vertexWithCompId, Tuple2<Long, Long> edge) throws Exception {
            return (Long)edge.f1;
        }
    }

    private static class DependencyConnectedComponentsProgram {
        private DependencyConnectedComponentsProgram() {
        }

        public static String runProgram(String resultPath) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataSource initialSolutionSet = env.fromCollection(verticesInput);
            DataSource edges = env.fromCollection(edgesInput);
            int keyPosition = 0;
            DeltaIteration iteration = initialSolutionSet.iterateDelta((DataSet)initialSolutionSet, 20, new int[]{keyPosition});
            GroupReduceOperator candidates = iteration.getWorkset().join((DataSet)edges).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new FindCandidatesJoin()).groupBy((KeySelector)new KeySelector<Long, Long>(){

                public Long getKey(Long id) {
                    return id;
                }
            }).reduceGroup((GroupReduceFunction)new RemoveDuplicatesReduce());
            JoinOperator.EquiJoin candidatesDependencies = candidates.join((DataSet)edges).where((KeySelector)new KeySelector<Long, Long>(){

                public Long getKey(Long id) {
                    return id;
                }
            }).equalTo((KeySelector)new KeySelector<Tuple2<Long, Long>, Long>(){

                public Long getKey(Tuple2<Long, Long> vertexWithId) {
                    return (Long)vertexWithId.f1;
                }
            }).with((JoinFunction)new FindCandidatesDependenciesJoin());
            GroupReduceOperator verticesWithNewComponents = candidatesDependencies.join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new NeighborWithComponentIDJoin()).groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new MinimumReduce());
            FlatMapOperator updatedComponentId = verticesWithNewComponents.join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).flatMap((FlatMapFunction)new MinimumIdFilter());
            iteration.closeWith((DataSet)updatedComponentId, (DataSet)updatedComponentId).writeAsText(resultPath);
            env.execute();
            return resultPath;
        }
    }
}

