/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.graph.examples;

import java.util.Collection;
import java.util.HashSet;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.ReduceNeighborsFunction;
import org.apache.flink.graph.Triplet;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.VertexJoinFunction;
import org.apache.flink.graph.examples.data.JaccardSimilarityMeasureData;

public class JaccardSimilarityMeasure
implements ProgramDescription {
    private static boolean fileOutput = false;
    private static String edgeInputPath = null;
    private static String outputPath = null;

    public static void main(String[] args) throws Exception {
        if (!JaccardSimilarityMeasure.parseParameters(args)) {
            return;
        }
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Edge<Long, Double>> edges = JaccardSimilarityMeasure.getEdgesDataSet(env);
        Graph graph = Graph.fromDataSet(edges, (MapFunction)new MapFunction<Long, HashSet<Long>>(){

            public HashSet<Long> map(Long id) throws Exception {
                HashSet<Long> neighbors = new HashSet<Long>();
                neighbors.add(id);
                return new HashSet<Long>(neighbors);
            }
        }, (ExecutionEnvironment)env);
        DataSet computedNeighbors = graph.reduceOnNeighbors((ReduceNeighborsFunction)new GatherNeighbors(), EdgeDirection.ALL);
        Graph graphWithVertexValues = graph.joinWithVertices(computedNeighbors, (VertexJoinFunction)new VertexJoinFunction<HashSet<Long>, HashSet<Long>>(){

            public HashSet<Long> vertexJoin(HashSet<Long> vertexValue, HashSet<Long> inputValue) {
                return inputValue;
            }
        });
        MapOperator edgesWithJaccardValues = graphWithVertexValues.getTriplets().map((MapFunction)new ComputeJaccard());
        if (fileOutput) {
            edgesWithJaccardValues.writeAsCsv(outputPath, "\n", ",");
            env.execute("Executing Jaccard Similarity Measure");
        } else {
            edgesWithJaccardValues.print();
        }
    }

    public String getDescription() {
        return "Vertex Jaccard Similarity Measure";
    }

    private static boolean parseParameters(String[] args) {
        if (args.length > 0) {
            if (args.length != 2) {
                System.err.println("Usage JaccardSimilarityMeasure <edge path> <output path>");
                return false;
            }
            fileOutput = true;
            edgeInputPath = args[0];
            outputPath = args[1];
        } else {
            System.out.println("Executing JaccardSimilarityMeasure example with default parameters and built-in default data.");
            System.out.println("Provide parameters to read input data from files.");
            System.out.println("Usage JaccardSimilarityMeasure <edge path> <output path>");
        }
        return true;
    }

    private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
        if (fileOutput) {
            return env.readCsvFile(edgeInputPath).ignoreComments("#").fieldDelimiter("\t").lineDelimiter("\n").types(Long.class, Long.class).map((MapFunction)new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>(){

                public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
                    return new Edge(tuple2.f0, tuple2.f1, (Object)new Double(0.0));
                }
            });
        }
        return JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env);
    }

    private static final class ComputeJaccard
    implements MapFunction<Triplet<Long, HashSet<Long>, Double>, Edge<Long, Double>> {
        private ComputeJaccard() {
        }

        public Edge<Long, Double> map(Triplet<Long, HashSet<Long>, Double> triplet) throws Exception {
            Vertex srcVertex = triplet.getSrcVertex();
            Vertex trgVertex = triplet.getTrgVertex();
            Long x = (Long)srcVertex.getId();
            Long y = (Long)trgVertex.getId();
            HashSet neighborSetY = (HashSet)trgVertex.getValue();
            double unionPlusIntersection = ((HashSet)srcVertex.getValue()).size() + neighborSetY.size();
            HashSet unionSet = new HashSet();
            unionSet.addAll((Collection)srcVertex.getValue());
            unionSet.addAll(neighborSetY);
            double union = unionSet.size();
            double intersection = unionPlusIntersection - union;
            return new Edge((Object)x, (Object)y, (Object)(intersection / union));
        }
    }

    private static final class GatherNeighbors
    implements ReduceNeighborsFunction<HashSet<Long>> {
        private GatherNeighbors() {
        }

        public HashSet<Long> reduceNeighbors(HashSet<Long> first, HashSet<Long> second) {
            first.addAll(second);
            return new HashSet<Long>(first);
        }
    }
}

