java - Spark 2.2.0 API : Which one should i prefer Dataset with Groupby combined with aggregate or RDD with ReduceBykey

标签 java performance apache-spark dataset apache-spark-sql

大家好,首先,根据标题,有人可能会说问题已经得到解答,但我的观点是比较ReduceBykey、GroupBykey 的性能,具体针对Dataset 和RDD API。我在许多帖子中看到,ReduceBykey 方法的性能比 GroupByKey 方法更有效,当然我同意这一点。尽管如此,我还是有点困惑,如果我们使用数据集或 RDD,我无法弄清楚这些方法的行为方式。每种情况应该使用哪一个?

我会尝试更具体,因此我将提供我的问题、解决方案以及工作代码,我正在等待您尽早提出改进建议。

+---+------------------+-----+
|id |Text1             |Text2|
+---+------------------+-----+
|1  |one,two,three     |one  |
|2  |four,one,five     |six  |
|3  |seven,nine,one,two|eight|
|4  |two,three,five    |five |
|5  |six,five,one      |seven|
+---+------------------+-----+

这里的重点是检查第三列是否包含在第二列的每一行中,然后收集它们的所有 ID。例如,第三列的单词“one”出现在ID为1,5,2,3的第二列的句子中。

+-----+------------+
|Text2|Set         |
+-----+------------+
|seven|[3]         |
|one  |[1, 5, 2, 3]|
|six  |[5]         |
|five |[5, 2, 4]   |
+-----+------------+

这是我的工作代码

List<Row> data = Arrays.asList(
                RowFactory.create(1, "one,two,three", "one"),
                RowFactory.create(2, "four,one,five", "six"),
                RowFactory.create(3, "seven,nine,one,two", "eight"),
                RowFactory.create(4, "two,three,five", "five"),
                RowFactory.create(5, "six,five,one", "seven")
        );

        StructType schema = new StructType(new StructField[]{
                new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
                new StructField("Text1", DataTypes.StringType, false, Metadata.empty()),
                new StructField("Text2", DataTypes.StringType, false, Metadata.empty())
        });

        Dataset<Row> df = spark.createDataFrame(data, schema);
        df.show(false);
        Dataset<Row> df1 = df.select("id", "Text1")
                .crossJoin(df.select("Text2"))
                .filter(col("Text1").contains(col("Text2")))
                .orderBy(col("Text2"));

        df1.show(false);

        Dataset<Row> df2 = df1
                .groupBy("Text2")
                .agg(collect_set(col("id")).as("Set"));

        df2.show(false);

我的问题分为 3 个子序列:

  • 为了提高性能,我是否需要转换RDD中的数据集并使用ReduceBykey代替数据集groupby?
  • 我应该使用哪一个?为什么?数据集或 RDD
  • 如果您能提供一种更有效的替代解决方案(如果我的方法中存在的话),我将不胜感激

最佳答案

TL;DR 两者都不好,但如果您使用 Dataset,请保留 Dataset

如果与合适的函数一起使用,

Dataset.groupBy 的行为类似于 reduceByKey。不幸的是,如果重复项数量较少,collect_set 的行为与 groupByKey 非常相似。用reduceByKey重写它 won't change a thing .

i would be grateful if you could give an alternative solution that is more efficient if exists in my approach

您能做的最好的事情就是删除crossJoin:

val df = Seq((1, "one,two,three", "one"),
  (2, "four,one,five", "six"),
  (3, "seven,nine,one,two", "eight"),
  (4, "two,three,five", "five"),
  (5, "six,five,one", "seven")).toDF("id", "text1", "text2")

df.select(col("id"), explode(split(col("Text1"), ",")).alias("w"))
  .join(df.select(col("Text2").alias("w")), Seq("w"))
  .groupBy("w")
  .agg(collect_set(col("id")).as("Set")).show

+-----+------------+
|    w|         Set|
+-----+------------+
|seven|         [3]|
|  one|[1, 5, 2, 3]|
|  six|         [5]|
| five|   [5, 2, 4]|
+-----+------------+

关于java - Spark 2.2.0 API : Which one should i prefer Dataset with Groupby combined with aggregate or RDD with ReduceBykey,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47621626/

相关文章:

java - 从我的插件中清除 Eclipse 错误日志 View

java - 如何用基本表格填充 JScrollPane

java - 使用Spark编写数据集时,如何将驼峰式列名修改为小写并带下划线?

java - 分割后字符串变成空白

java - `ServletContext`接口(interface)是否由Tomcat服务器内部的任何类实现?

c - 为什么数字运算程序在发散为 NaN 时开始运行得更慢?

performance - 将谓词应用于 Prolog : requesting advice on implementation choices 中列表的子集

python - Flask 的请求和基本分析信息

python - 如何提高增量DeltaLake表的合并操作的性能?

apache-spark - 热衷于对远程 Spark Master 执行 "spark-submit"?