apache-spark - 如何在 Spark DataFrame 中将列除以总和

标签 apache-spark pyspark apache-spark-sql

如何在 Spark DataFrame 中高效地将列除以它自己的总和,而不立即触发计算?

假设我们有一些数据:

import pyspark
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as spf

spark = SparkSession.builder.master('local').getOrCreate()

data = spark.range(0, 100)

data # --> DataFrame[id: bigint]

我想在此数据框中创建一个名为“标准化”的新列,其中包含 id / sum(id) 。一种方法是预先计算总和,如下所示:

s = data.select(spf.sum('id')).collect()[0][0]
data2 = data.withColumn('normalized', spf.col('id') / s)
data2 # --> DataFrame[id: bigint, normalized: double]

这工作正常,但它立即触发计算;如果您为许多列定义类似的内容,则会导致对数据进行多次冗余传递。

另一种方法是使用包含整个表的窗口规范:

w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
data3 = data.withColumn('normalized', spf.col('id') / spf.sum('id').over(w))
data3 # --> DataFrame[id: bigint, normalized: double]

在这种情况下,定义 data3 就可以了。 ,但是一旦您尝试实际计算它,Spark 2.2.0 会将所有数据移动到单个分区中,这通常会导致大型数据集的作业失败。

还有哪些其他方法可以解决这个问题,既不触发立即计算,又适用于大型数据集?我对任何解决方案感兴趣,不一定是基于 pyspark 的解决方案.

最佳答案

crossJoin 与聚合是一种方法:

data.crossJoin( 
    data.select(spf.sum('id').alias("sum_id"))
).withColumn("normalized", spf.col("id") / spf.col("sum_id"))

但我不会太担心:

That works fine, but it immediately triggers a computation; if you're defining something similar for many columns it will cause multiple redundant passes over the data.

一次计算多个统计数据:

data2 = data.select(spf.rand(42).alias("x"), spf.randn(42).alias("y"))
mean_x, mean_y = data2.groupBy().mean().first()

剩下的只是对局部表达式的操作:

data2.select(spf.col("x") - mean_x, spf.col("y") - mean_y)

关于apache-spark - 如何在 Spark DataFrame 中将列除以总和,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48552672/

相关文章:

scala - Spark 并行处理列

scala - 我们能否使用多个 Spark session 来访问两个不同的 Hive 服务器

python - Pyspark 中的 Pickle 错误

python-3.x - pyspark中的内反加入

apache-spark - 在 Databricks 上将 Spark.databricks.service.server.enabled 设置为 true 时到底会发生什么?

python - 将 `SPARK_HOME` 设置为什么?

python - 不要在 Spark (Python) 中写入 None 或空行

apache-spark - Spark 窗口函数中的条件

apache-spark - 在 PySpark 中读取文件在读取整个目录然后过滤和读取目录的一部分之间有什么区别?

pyspark - .isin() 包含数据框中的一列