scala - Spark : normalize each row of a DataFrame

标签 scala apache-spark

我有一个 Spark DataFrame,如下所示

df.show()
+------+------+------+
|  col1|  col2|  col3|
+------+------+------+
|   5.0|   5.0|   0.0|
|   2.0|   3.0|   5.0|
|   4.0|   1.0|  10.0|
+------+------+------+

我想规范化每个单独的行,这样操作后,新列将如下所示:

+--------+--------+--------+
|new_col1|new_col2|new_col3|
+--------+--------+--------+
|     0.5|     0.5|     0.0|
|     0.2|     0.3|     0.5|
|0.266667|0.066667|0.666667|
+--------+--------+--------+

更正式地说,我想要应用的操作是:

对于每一行,

    new_col_i = col_i / (col_1 + col_2 + col_3)

我需要以编程方式执行此操作,而不是列出所有列,因为我的 DataFrame 有很多列。

当前解决方案:

我当前想到的解决方案是创建一个列来表示每行所有条目的总和,然后将每一列除以该总和列。

var newDF = df.withColumn("total", df.columns.map(c => col(c)).reduce((c1, c2) => c1 + c2))

for (c <- Array("col1", "col2", "col3")) {
    newDF = newDF.withColumn("normalized_" + c, col(c).divide(col("total")))
}
newDF.show()

+----+----+----+-----+-------------------+-------------------+------------------+
|col1|col2|col3|total|    normalized_col1|    normalized_col2|   normalized_col3|
+----+----+----+-----+-------------------+-------------------+------------------+
| 5.0| 5.0| 0.0| 10.0|                0.5|                0.5|               0.0|
| 2.0| 3.0| 5.0| 10.0|                0.2|                0.3|               0.5|
| 4.0| 1.0|10.0| 15.0|0.26666666666666666|0.06666666666666667|0.6666666666666666|
+----+----+----+-----+-------------------+-------------------+------------------+

有什么替代方法可以使代码更简洁吗?

最佳答案

您的解决方案是正确的,并且不能改进太多。您可以通过用 foldLeft 替换 for 循环来摆脱 var 的非惯用用法,并使用更多的语法糖,但除此之外将保持不变:

val withTotal = df.withColumn("total", df.columns.map(col).reduce(_ + _))

val result = df.columns.foldLeft(withTotal) {
  (tmp, c) => tmp.withColumn(s"new_$c", $"$c" / $"total")
}
  .drop(df.columns: _*)
  .drop("total")

关于scala - Spark : normalize each row of a DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47641076/

相关文章:

json - 如何解决此Scala/Play编译错误(返回错误的类型)?

apache-spark - 齐柏林飞艇 : How to restart sparkContext in zeppelin

apache-spark - 如何在spark-submit命令中引用.so文件

scala - sbt:我可以将scala编译器插件的源代码放入需要使用该插件编译的项目中吗?

scala - 何时使用类 vs 对象 vs 案例类 vs 特征

scala - 在 Scala Spark 中嵌套 RDD

python - 为什么我的 Spark DataFrame 比 RDD 慢很多?

scala - Spark-sql/Scala 中的 Unpivot 列名称是数字

python - 我们如何使用 SQL-esque "LIKE"标准连接两个 Spark SQL 数据帧?

scala - 特征中未实现方法的返回类型