/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.graph.examples;

import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.examples.data.IncrementalSSSPData;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;

public class IncrementalSSSP
implements ProgramDescription {
    private static boolean fileOutput = false;
    private static String verticesInputPath = null;
    private static String edgesInputPath = null;
    private static String edgesInSSSPInputPath = null;
    private static Long srcEdgeToBeRemoved = null;
    private static Long trgEdgeToBeRemoved = null;
    private static Double valEdgeToBeRemoved = null;
    private static String outputPath = null;
    private static int maxIterations = 5;

    public static void main(String[] args) throws Exception {
        if (!IncrementalSSSP.parseParameters(args)) {
            return;
        }
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        Edge<Long, Double> edgeToBeRemoved = IncrementalSSSP.getEdgeToBeRemoved();
        Graph<Long, Double, Double> graph = IncrementalSSSP.getGraph(env);
        Graph<Long, Double, Double> ssspGraph = IncrementalSSSP.getSSSPGraph(env);
        graph.removeEdge(edgeToBeRemoved);
        ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
        if (IncrementalSSSP.isInSSSP(edgeToBeRemoved, (DataSet<Edge<Long, Double>>)ssspGraph.getEdges())) {
            parameters.setDirection(EdgeDirection.IN);
            parameters.setOptDegrees(true);
            Graph result = ssspGraph.runScatterGatherIteration((VertexUpdateFunction)new VertexDistanceUpdater(), (MessagingFunction)new InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters);
            DataSet resultedVertices = result.getVertices();
            if (fileOutput) {
                resultedVertices.writeAsCsv(outputPath, "\n", ",");
                env.execute("Incremental SSSP Example");
            } else {
                resultedVertices.print();
            }
        } else if (fileOutput) {
            graph.getVertices().writeAsCsv(outputPath, "\n", ",");
            env.execute("Incremental SSSP Example");
        } else {
            graph.getVertices().print();
        }
    }

    public String getDescription() {
        return "Incremental Single Sink Shortest Paths Example";
    }

    public static boolean isInSSSP(final Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception {
        return edgesInSSSP.filter((FilterFunction)new FilterFunction<Edge<Long, Double>>(){

            public boolean filter(Edge<Long, Double> edge) throws Exception {
                return edge.equals((Object)edgeToBeRemoved);
            }
        }).count() > 0L;
    }

    private static boolean parseParameters(String[] args) {
        if (args.length > 0) {
            if (args.length == 8) {
                fileOutput = true;
                verticesInputPath = args[0];
                edgesInputPath = args[1];
                edgesInSSSPInputPath = args[2];
                srcEdgeToBeRemoved = Long.parseLong(args[3]);
                trgEdgeToBeRemoved = Long.parseLong(args[4]);
                valEdgeToBeRemoved = Double.parseDouble(args[5]);
                outputPath = args[6];
                maxIterations = Integer.parseInt(args[7]);
            } else {
                System.out.println("Executing IncrementalSSSP example with default parameters and built-in default data.");
                System.out.println("Provide parameters to read input data from files.");
                System.out.println("See the documentation for the correct format of input files.");
                System.out.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <src id edge to be removed> <trg id edge to be removed> <val edge to be removed> <output path> <max iterations>");
                return false;
            }
        }
        return true;
    }

    private static Graph<Long, Double, Double> getGraph(ExecutionEnvironment env) {
        if (fileOutput) {
            return Graph.fromCsvReader((String)verticesInputPath, (String)edgesInputPath, (ExecutionEnvironment)env).lineDelimiterEdges("\n").types(Long.class, Double.class, Double.class);
        }
        return Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), IncrementalSSSPData.getDefaultEdgeDataSet(env), (ExecutionEnvironment)env);
    }

    private static Graph<Long, Double, Double> getSSSPGraph(ExecutionEnvironment env) {
        if (fileOutput) {
            return Graph.fromCsvReader((String)verticesInputPath, (String)edgesInSSSPInputPath, (ExecutionEnvironment)env).lineDelimiterEdges("\n").types(Long.class, Double.class, Double.class);
        }
        return Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), IncrementalSSSPData.getDefaultEdgesInSSSP(env), (ExecutionEnvironment)env);
    }

    private static Edge<Long, Double> getEdgeToBeRemoved() {
        if (fileOutput) {
            return new Edge((Object)srcEdgeToBeRemoved, (Object)trgEdgeToBeRemoved, (Object)valEdgeToBeRemoved);
        }
        return IncrementalSSSPData.getDefaultEdgeToBeRemoved();
    }

    public static final class InvalidateMessenger
    extends MessagingFunction<Long, Double, Double, Double> {
        private Edge<Long, Double> edgeToBeRemoved;

        public InvalidateMessenger(Edge<Long, Double> edgeToBeRemoved) {
            this.edgeToBeRemoved = edgeToBeRemoved;
        }

        public void sendMessages(Vertex<Long, Double> vertex) throws Exception {
            if (this.getSuperstepNumber() == 1 && ((Long)vertex.getId()).equals(this.edgeToBeRemoved.getSource())) {
                this.sendMessageTo(this.edgeToBeRemoved.getSource(), Double.MAX_VALUE);
            }
            if (this.getSuperstepNumber() > 1) {
                for (Edge edge : this.getEdges()) {
                    this.sendMessageTo(edge.getSource(), Double.MAX_VALUE);
                }
            }
        }
    }

    public static final class VertexDistanceUpdater
    extends VertexUpdateFunction<Long, Double, Double> {
        public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception {
            Long outDegree;
            if (inMessages.hasNext() && (outDegree = Long.valueOf(this.getOutDegree() - 1L)) <= 0L) {
                this.setNewVertexValue(Double.MAX_VALUE);
            }
        }
    }
}

