/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative;

import java.io.IOException;
import java.util.List;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.test.util.MultipleProgramsTestBaseJUnit4;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class DanglingPageRankITCase
extends MultipleProgramsTestBaseJUnit4 {
    private static final String AGGREGATOR_NAME = "pagerank.aggregator";

    public DanglingPageRankITCase(MultipleProgramsTestBaseJUnit4.TestExecutionMode mode) {
        super(mode);
    }

    @Test
    public void testDanglingPageRank() {
        try {
            int numIterations = 25;
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource vertices = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)1L, (Object)false), new Tuple2((Object)2L, (Object)false), new Tuple2((Object)5L, (Object)false), new Tuple2((Object)3L, (Object)true), new Tuple2((Object)4L, (Object)false)});
            DataSource edges = env.fromElements((Object[])new PageWithLinks[]{new PageWithLinks(2L, new long[]{1L}), new PageWithLinks(5L, new long[]{2L, 4L}), new PageWithLinks(4L, new long[]{3L, 2L}), new PageWithLinks(1L, new long[]{4L, 2L, 3L})});
            final long numVertices = vertices.count();
            final long numDanglingVertices = vertices.filter((FilterFunction)new FilterFunction<Tuple2<Long, Boolean>>(){

                public boolean filter(Tuple2<Long, Boolean> value) {
                    return (Boolean)value.f1;
                }
            }).count();
            MapOperator verticesWithInitialRank = vertices.map((MapFunction)new MapFunction<Tuple2<Long, Boolean>, PageWithRankAndDangling>(){

                public PageWithRankAndDangling map(Tuple2<Long, Boolean> value) {
                    return new PageWithRankAndDangling((Long)value.f0, 1.0 / (double)numVertices, (Boolean)value.f1);
                }
            });
            IterativeDataSet iteration = verticesWithInitialRank.iterate(25);
            iteration.getAggregators().registerAggregationConvergenceCriterion(AGGREGATOR_NAME, (Aggregator)new PageRankStatsAggregator(), (ConvergenceCriterion)new DiffL1NormConvergenceCriterion());
            JoinOperator.EquiJoin partialRanks = iteration.join((DataSet)edges).where(new String[]{"pageId"}).equalTo(new String[]{"pageId"}).with((FlatJoinFunction)new FlatJoinFunction<PageWithRankAndDangling, PageWithLinks, PageWithRank>(){

                public void join(PageWithRankAndDangling page, PageWithLinks links, Collector<PageWithRank> out) {
                    double rankToDistribute = page.rank / (double)links.targets.length;
                    PageWithRank output = new PageWithRank(0L, rankToDistribute);
                    long[] lArray = links.targets;
                    int n = lArray.length;
                    for (int i = 0; i < n; ++i) {
                        long target;
                        output.pageId = target = lArray[i];
                        out.collect((Object)output);
                    }
                }
            });
            CoGroupOperator newRanks = iteration.coGroup((DataSet)partialRanks).where(new String[]{"pageId"}).equalTo(new String[]{"pageId"}).with((CoGroupFunction)new RichCoGroupFunction<PageWithRankAndDangling, PageWithRank, PageWithRankAndDangling>(){
                private static final double BETA = 0.85;
                private final double randomJump;
                private PageRankStatsAggregator aggregator;
                private double danglingRankFactor;
                {
                    this.randomJump = 0.15000000000000002 / (double)numVertices;
                }

                public void open(OpenContext openContext) throws Exception {
                    int currentIteration = this.getIterationRuntimeContext().getSuperstepNumber();
                    this.aggregator = (PageRankStatsAggregator)this.getIterationRuntimeContext().getIterationAggregator(DanglingPageRankITCase.AGGREGATOR_NAME);
                    if (currentIteration == 1) {
                        this.danglingRankFactor = 0.85 * (double)numDanglingVertices / ((double)numVertices * (double)numVertices);
                    } else {
                        PageRankStats previousAggregate = (PageRankStats)this.getIterationRuntimeContext().getPreviousIterationAggregate(DanglingPageRankITCase.AGGREGATOR_NAME);
                        this.danglingRankFactor = 0.85 * previousAggregate.danglingRank() / (double)numVertices;
                    }
                }

                public void coGroup(Iterable<PageWithRankAndDangling> currentPages, Iterable<PageWithRank> partialRanks, Collector<PageWithRankAndDangling> out) {
                    long edges = 0L;
                    double summedRank = 0.0;
                    for (PageWithRank partial : partialRanks) {
                        summedRank += partial.rank;
                        ++edges;
                    }
                    double rank = 0.85 * summedRank + this.randomJump + this.danglingRankFactor;
                    PageWithRankAndDangling currentPage = currentPages.iterator().next();
                    double currentRank = currentPage.rank;
                    boolean isDangling = currentPage.dangling;
                    double danglingRankToAggregate = isDangling ? rank : 0.0;
                    long danglingVerticesToAggregate = isDangling ? 1L : 0L;
                    double diff = Math.abs(currentRank - rank);
                    this.aggregator.aggregate(diff, rank, danglingRankToAggregate, danglingVerticesToAggregate, 1L, edges);
                    currentPage.rank = rank;
                    out.collect((Object)currentPage);
                }
            });
            List result = iteration.closeWith((DataSet)newRanks).collect();
            double totalRank = 0.0;
            for (PageWithRankAndDangling r : result) {
                totalRank += r.rank;
                Assert.assertTrue((r.pageId >= 1L && r.pageId <= 5L ? 1 : 0) != 0);
                Assert.assertTrue((r.pageId != 3L || r.dangling ? 1 : 0) != 0);
            }
            Assert.assertEquals((double)1.0, (double)totalRank, (double)0.001);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private static class DiffL1NormConvergenceCriterion
    implements ConvergenceCriterion<PageRankStats> {
        private static final double EPSILON = 5.0E-5;

        private DiffL1NormConvergenceCriterion() {
        }

        public boolean isConverged(int iteration, PageRankStats pageRankStats) {
            return pageRankStats.diff() < 5.0E-5;
        }
    }

    private static class PageRankStatsAggregator
    implements Aggregator<PageRankStats> {
        private double diff;
        private double rank;
        private double danglingRank;
        private long numDanglingVertices;
        private long numVertices;
        private long edges;

        private PageRankStatsAggregator() {
        }

        public PageRankStats getAggregate() {
            return new PageRankStats(this.diff, this.rank, this.danglingRank, this.numDanglingVertices, this.numVertices, this.edges);
        }

        public void aggregate(double diffDelta, double rankDelta, double danglingRankDelta, long danglingVerticesDelta, long verticesDelta, long edgesDelta) {
            this.diff += diffDelta;
            this.rank += rankDelta;
            this.danglingRank += danglingRankDelta;
            this.numDanglingVertices += danglingVerticesDelta;
            this.numVertices += verticesDelta;
            this.edges += edgesDelta;
        }

        public void aggregate(PageRankStats pageRankStats) {
            this.diff += pageRankStats.diff();
            this.rank += pageRankStats.rank();
            this.danglingRank += pageRankStats.danglingRank();
            this.numDanglingVertices += pageRankStats.numDanglingVertices();
            this.numVertices += pageRankStats.numVertices();
            this.edges += pageRankStats.edges();
        }

        public void reset() {
            this.diff = 0.0;
            this.rank = 0.0;
            this.danglingRank = 0.0;
            this.numDanglingVertices = 0L;
            this.numVertices = 0L;
            this.edges = 0L;
        }
    }

    public static class PageRankStats
    implements Value {
        private double diff;
        private double rank;
        private double danglingRank;
        private long numDanglingVertices;
        private long numVertices;
        private long edges;

        public PageRankStats() {
        }

        public PageRankStats(double diff, double rank, double danglingRank, long numDanglingVertices, long numVertices, long edges) {
            this.diff = diff;
            this.rank = rank;
            this.danglingRank = danglingRank;
            this.numDanglingVertices = numDanglingVertices;
            this.numVertices = numVertices;
            this.edges = edges;
        }

        public double diff() {
            return this.diff;
        }

        public double rank() {
            return this.rank;
        }

        public double danglingRank() {
            return this.danglingRank;
        }

        public long numDanglingVertices() {
            return this.numDanglingVertices;
        }

        public long numVertices() {
            return this.numVertices;
        }

        public long edges() {
            return this.edges;
        }

        public void write(DataOutputView out) throws IOException {
            out.writeDouble(this.diff);
            out.writeDouble(this.rank);
            out.writeDouble(this.danglingRank);
            out.writeLong(this.numDanglingVertices);
            out.writeLong(this.numVertices);
            out.writeLong(this.edges);
        }

        public void read(DataInputView in) throws IOException {
            this.diff = in.readDouble();
            this.rank = in.readDouble();
            this.danglingRank = in.readDouble();
            this.numDanglingVertices = in.readLong();
            this.numVertices = in.readLong();
            this.edges = in.readLong();
        }

        public String toString() {
            return "PageRankStats: diff [" + this.diff + "], rank [" + this.rank + "], danglingRank [" + this.danglingRank + "], numDanglingVertices [" + this.numDanglingVertices + "], numVertices [" + this.numVertices + "], edges [" + this.edges + "]";
        }
    }

    public static class PageWithLinks {
        public long pageId;
        public long[] targets;

        public PageWithLinks() {
        }

        public PageWithLinks(long pageId, long[] targets) {
            this.pageId = pageId;
            this.targets = targets;
        }
    }

    public static class PageWithRankAndDangling {
        public long pageId;
        public double rank;
        public boolean dangling;

        public PageWithRankAndDangling() {
        }

        public PageWithRankAndDangling(long pageId, double rank, boolean dangling) {
            this.pageId = pageId;
            this.rank = rank;
            this.dangling = dangling;
        }

        public String toString() {
            return "PageWithRankAndDangling{pageId=" + this.pageId + ", rank=" + this.rank + ", dangling=" + this.dangling + '}';
        }
    }

    public static class PageWithRank {
        public long pageId;
        public double rank;

        public PageWithRank() {
        }

        public PageWithRank(long pageId, double rank) {
            this.pageId = pageId;
            this.rank = rank;
        }
    }
}

