我有一个数据框:
df = (spark
.range(0, 10 * 1000 * 1000)\
.withColumn('id', (col('id') / 1000).cast('integer'))\
.withColumn('v', rand()))
输出:
+---+-------------------+
| id| v|
+---+-------------------+
| 0|0.05011803459635367|
| 0| 0.6749337782428327|
| 0| 0.9449105904567048|
| 0| 0.9183605955607251|
| 0| 0.648596393346793|
+---+-------------------+
现在,可以通过 SQL 函数和 UDF 完成一个简单的 - 将 1 添加到“v”。
如果我们忽略 SQL(性能最佳)
我们可以创建一个 UDF:
@udf("double")
def plus_one(v):
return v + 1
并调用它:
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
时间:16.5秒
但这是我的问题:
如果我不使用udf并直接写入:
def plus_one(v):
return v + 1
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
所用时间 - 352ms
简而言之,UDF 查询花费了约 16 秒,而普通的 Python 函数花费了约 350 毫秒
为了比较,
df.selectExpr("id", "v+1 as v").agg(count(col('v'))).show()
时间:347ms
这是我的困境:
如果我可以使用一个普通的 python 函数来执行相同的场景,该函数的性能与内置函数相当......
问。为什么我们不直接使用Python函数呢?
问。仅当我们计划像命令一样在 SQL 中使用 UDF 时,注册 UDF 才重要吗?
一定有一些优化原因导致我们不这样做......或者可能与 Spark 集群的工作方式有关?
[ 已经回答了 2 个问题,但这两个问题都以“首选 SQL 内置函数...”结尾 我正在将 python 函数与 UDF 及其在 pyspark 应用程序中的可行性进行比较。 ]
编辑: 我也用 pandas_udf 做到了这一点:
@pandas_udf('double')
def vectorized_plus_one(v):
return v + 1
df.withColumn('v', vectorized_plus_one(df.v)).agg(count(col('v'))).show()
时间:5.26秒
我附上了屏幕截图:
The output for Adding 1 to value - Python funtion (standalone), UDF, SQL
最佳答案
你的场景之所以有效,是因为实际上你没有在 python 中添加 1,它是在 Java 中添加的,其添加方式与使用 SQL 时使用的方式非常相似。
让我们把这个案例分开:
- 您执行的
plus_one(df.v)
相当于仅传递df.v + 1
- 尝试在您最喜欢的 repl 中输入
df.v + 1
,您会看到它返回Column
类型的对象。 - 怎么可能呢?
Column
类覆盖了__radd__
魔术方法(以及其他一些方法),并返回新的Column
实例,其中包含向指定列加 1 的指令。
总之:withColumn
始终接受 Column
类型的对象作为第二个参数,而向列添加 1 的技巧就是 Python 的魔力。
这就是为什么它比udf
和矢量化udf
工作得更快:它们需要运行python进程,序列化/反序列化数据(矢量化udf可以使用arrow更快地工作
以避免序列化/反序列化),在较慢的 python 进程中计算。
关于python - 当 python 函数比它们更快时,为什么我们使用 pyspark UDF? (注意。不用担心 Spark SQL 命令),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64207466/