java - 为什么使用 Dataset 的 PageRank 作业比 RDD 慢得多?

标签 java apache-spark apache-spark-sql apache-spark-dataset

我实现了this example使用较新的 Dataset API 在 Java 中实现 PageRank。当我针对使用旧 RDD API 的示例对我的代码进行基准测试时,我发现我的代码需要 186 秒,而基准仅需要 109 秒。是什么造成了这种差异? (旁注:即使数据库只包含少量条目,Spark 花费数百秒是否正常?)

我的代码:

Dataset<Row> outLinks = spark.read().jdbc("jdbc:postgresql://127.0.0.1:5432/postgres", "storagepage_outlinks", props);
Dataset<Row> page = spark.read().jdbc("jdbc:postgresql://127.0.0.1:5432/postgres", "pages", props);

outLinks = page.join(outLinks, page.col("id").equalTo(outLinks.col("storagepage_id")));
outLinks = outLinks.distinct().groupBy(outLinks.col("url")).agg(collect_set("outlinks")).cache();

Dataset<Row> ranks = outLinks.map(row -> new Tuple2<>(row.getString(0), 1.0), Encoders.tuple(Encoders.STRING(), Encoders.DOUBLE())).toDF("url", "rank");

for (int i = 0; i < iterations; i++) {
    Dataset<Row> joined = outLinks.join(ranks, new Set.Set1<>("url").toSeq());
    Dataset<Row> contribs = joined.flatMap(row -> {
        List<String> links = row.getList(1);
        double rank = row.getDouble(2);
        return links
                .stream()
                .map(s -> new Tuple2<>(s, rank / links.size()))
                .collect(Collectors.toList()).iterator();
    }, Encoders.tuple(Encoders.STRING(), Encoders.DOUBLE())).toDF("url", "num");

    Dataset<Tuple2<String, Double>> reducedByKey =
            contribs.groupByKey(r -> r.getString(0), Encoders.STRING())
            .mapGroups((s, iterator) -> {
                double sum = 0;
                while (iterator.hasNext()) {
                    sum += iterator.next().getDouble(1);
                }
                return new Tuple2<>(s, sum);
            }, Encoders.tuple(Encoders.STRING(), Encoders.DOUBLE()));
    ranks = reducedByKey.map(t -> new Tuple2<>(t._1, .15 + t._2 * .85),
            Encoders.tuple(Encoders.STRING(), Encoders.DOUBLE())).toDF("url", "rank");
}
ranks.show();

使用 RDD 的示例代码(适用于从我的数据库读取):

Dataset<Row> outLinks = spark.read().jdbc("jdbc:postgresql://127.0.0.1:5432/postgres", "storagepage_outlinks", props);
Dataset<Row> page = spark.read().jdbc("jdbc:postgresql://127.0.0.1:5432/postgres", "pages", props);

outLinks = page.join(outLinks, page.col("id").equalTo(outLinks.col("storagepage_id")));
outLinks = outLinks.distinct().groupBy(outLinks.col("url")).agg(collect_set("outlinks")).cache(); // TODO: play with this cache
JavaPairRDD<String, Iterable<String>> links = outLinks.javaRDD().mapToPair(row -> new Tuple2<>(row.getString(0), row.getList(1)));

// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
JavaPairRDD<String, Double> ranks = links.mapValues(rs -> 1.0);

// Calculates and updates URL ranks continuously using PageRank algorithm.
for (int current = 0; current < 20; current++) {
    // Calculates URL contributions to the rank of other URLs.
    JavaPairRDD<String, Double> contribs = links.join(ranks).values()
            .flatMapToPair(s -> {
                int urlCount = size(s._1());
                List<Tuple2<String, Double>> results = new ArrayList<>();
                for (String n : s._1) {
                    results.add(new Tuple2<>(n, s._2() / urlCount));
                }
                return results.iterator();
            });

    // Re-calculates URL ranks based on neighbor contributions.
    ranks = contribs.reduceByKey((x, y) -> x + y).mapValues(sum -> 0.15 + sum * 0.85);
}

// Collects all URL ranks and dump them to console.
List<Tuple2<String, Double>> output = ranks.collect();
for (Tuple2<?,?> tuple : output) {
    System.out.println(tuple._1() + " has rank: " + tuple._2() + ".");
}

最佳答案

TL;DR 它可能很旧 Avoid GroupByKey东西。

很难确定,但您的 Dataset 代码相当于 groupByKey:

groupByKey(...).mapGroups(...)

这意味着它先洗牌,然后减少数据。

您的 RDD 使用 reduceByKey - 这应该通过应用本地缩减来减少洗牌大小。如果您希望此代码在某种程度上等效,您应该使用 groupByKey(...).reduceGroups(...)< 重写 groupByKey(...).mapGroups(...)/.

另一个可能的候选者是配置。 spark.sql.shuffle.partitions 的默认值为 200,将用于 Dataset 聚合。如果

the database only contains a handful of entries?

这是严重的矫枉过正。

RDD 将使用 spark.default.parallelism 或基于父数据的值,这通常要温和得多。

关于java - 为什么使用 Dataset 的 PageRank 作业比 RDD 慢得多?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47579716/

相关文章:

java - 用于发布和开发应用程序的 Apache Spark Maven 依赖项

java - 如何使用 Selenium 根据其 href 属性在页面中选择链接?

java - 运行 spark 时出现堆错误?

java - 如何为我自己的项目正确导入 commons.apache.math 库

java.lang.ClassNotFoundException : org. Spark_project.guava.collect.MapMaker

performance - Spark 。数据缓存?

scala - 如何在 spark 3.0+ 中获得一年中的一周?

scala - 根据时间戳列过滤数据框

java - 了解基本的 Java 特定面试问题

java - 为什么我的java LinkedList添加了相同的值