python - 当 python 函数比它们更快时,为什么我们使用 pyspark UDF? (注意。不用担心 Spark SQL 命令)

标签 python pyspark user-defined-functions

我有一个数据框:

df = (spark
  .range(0, 10 * 1000 * 1000)\
  .withColumn('id', (col('id') / 1000).cast('integer'))\
  .withColumn('v', rand()))

输出:

+---+-------------------+
| id|                  v|
+---+-------------------+
|  0|0.05011803459635367|
|  0| 0.6749337782428327|
|  0| 0.9449105904567048|
|  0| 0.9183605955607251|
|  0|  0.648596393346793|
+---+-------------------+

现在,可以通过 SQL 函数和 UDF 完成一个简单的 - 将 1 添加到“v”。

如果我们忽略 SQL(性能最佳)

我们可以创建一个 UDF:

@udf("double")
def plus_one(v):
    return v + 1

并调用它:

df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()

时间:16.5秒

但这是我的问题:

如果我使用udf并直接写入:

def plus_one(v):
        return v + 1
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()

所用时间 - 352ms

简而言之,UDF 查询花费了约 16 秒,而普通的 Python 函数花费了约 350 毫秒

为了比较,

df.selectExpr("id", "v+1 as v").agg(count(col('v'))).show()

时间:347ms

这是我的困境:

如果我可以使用一个普通的 python 函数来执行相同的场景,该函数的性能与内置函数相当......

问。为什么我们不直接使用Python函数呢?

问。仅当我们计划像命令一样在 SQL 中使用 UDF 时,注册 UDF 才重要吗?

一定有一些优化原因导致我们不这样做......或者可能与 Spark 集群的工作方式有关?

[ 已经回答了 2 个问题,但这两个问题都以“首选 SQL 内置函数...”结尾 我正在将 python 函数与 UDF 及其在 pyspark 应用程序中的可行性进行比较。 ]

编辑: 我也用 pandas_udf 做到了这一点:

@pandas_udf('double')
def vectorized_plus_one(v):
    return v + 1

df.withColumn('v', vectorized_plus_one(df.v)).agg(count(col('v'))).show()

时间:5.26秒

我附上了屏幕截图:

The output for Adding 1 to value - Python funtion (standalone), UDF, SQL

最佳答案

你的场景之所以有效,是因为实际上你没有在 python 中添加 1,它是在 Java 中添加的,其添加方式与使用 SQL 时使用的方式非常相似。

让我们把这个案例分开:

  1. 您执行的 plus_one(df.v) 相当于仅传递 df.v + 1
  2. 尝试在您最喜欢的 repl 中输入 df.v + 1,您会看到它返回 Column 类型的对象。
  3. 怎么可能呢? Column 类覆盖了 __radd__ 魔术方法(以及其他一些方法),并返回新的 Column 实例,其中包含向指定列加 1 的指令。

总之:withColumn 始终接受 Column 类型的对象作为第二个参数,而向列添加 1 的技巧就是 Python 的魔力。

这就是为什么它比udf矢量化udf工作得更快:它们需要运行python进程,序列化/反序列化数据(矢量化udf可以使用arrow更快地工作 以避免序列化/反序列化),在较慢的 python 进程中计算。

关于python - 当 python 函数比它们更快时,为什么我们使用 pyspark UDF? (注意。不用担心 Spark SQL 命令),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64207466/

相关文章:

python - 在pycharm中查找导入路径

python - 编辑 csv 以按特定顺序显示 5 列,同时将格式应用于日期时间字段

python - 遍历图表(查找所有链接的问题)

python - 如何处理数据框中可变的列数

apache-spark - 在 pyspark 数据框中添加具有另一列最大值的新列

python - Pyspark 2.4.0,使用读取流从 kafka 读取 avro - Python

excel - 刷新 Excel VBA 函数结果

apache-spark - 如何将spark sql数据框的摘要写入excel文件

excel - 如何为多变量索引匹配公式创建 UDF

java - 如何使用 spark UDF 返回复杂类型