python - 使用 Python 的 reduce() 连接多个 PySpark DataFrame

标签 python python-3.x pyspark apache-spark-sql

有谁知道为什么使用 Python3 的 functools.reduce() 在加入多个 PySpark DataFrames 时会导致比使用 for 循环迭代加入相同 DataFrames 更差的性能?具体来说,这会导致速度大幅下降,然后出现内存不足错误:

def join_dataframes(list_of_join_columns, left_df, right_df):
    return left_df.join(right_df, on=list_of_join_columns)

joined_df = functools.reduce(
    functools.partial(join_dataframes, list_of_join_columns), list_of_dataframes,
)

而这个不是:

joined_df = list_of_dataframes[0]
joined_df.cache()
for right_df in list_of_dataframes[1:]:
    joined_df = joined_df.join(right_df, on=list_of_join_columns)

任何想法将不胜感激。谢谢!

最佳答案

只要您使用 CPython(在这种特定情况下,不同的实现可以但实际上不应该表现出明显不同的行为)。如果你看一下reduce implementation您会发现它只是一个具有最少异常处理的 for 循环。

核心完全等同于你使用的循环

for element in it:
    value = function(value, element)

并且没有证据支持任何特殊行为的说法。

另外简单测试了 Spark 连接的帧数实际限制(连接 are among the most expensive operations in Spark)

dfs = [
    spark.range(10000).selectExpr(
        "rand({}) AS id".format(i), "id AS value",  "{} AS loop ".format(i)
    )
    for i in range(200)
]

显示直接for循环之间的时间没有显着差异

def f(dfs):
    df1 = dfs[0]
    for df2 in dfs[1:]:
        df1 = df1.join(df2, ["id"])
    return df1

%timeit -n3 f(dfs)                 
## 6.25 s ± 257 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)

减少调用

from functools import reduce

def g(dfs):
    return reduce(lambda x, y: x.join(y, ["id"]), dfs) 

%timeit -n3 g(dfs)
### 6.47 s ± 455 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)

类似地,整体 JVM 行为模式在 for 循环之间具有可比性

For loop CPU and Memory Usage - VisualVM

减少

reduce CPU and Memory Usage - VisualVM

最终两者都生成相同的执行计划

g(dfs)._jdf.queryExecution().optimizedPlan().equals( 
    f(dfs)._jdf.queryExecution().optimizedPlan()
)
## True

这表明在评估计划时没有差异并且很可能发生 OOM。

换句话说,您的相关性并不意味着因果关系,观察到的性能问题不太可能与您用于组合 DataFrames 的方法有关。

关于python - 使用 Python 的 reduce() 连接多个 PySpark DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44977549/

相关文章:

apache-spark - 我们如何保存巨大的 pyspark 数据框?

python - 将整个文件夹从本地移动到 S3

python - Pandas 数据框将每一组除以一个函数中的最大值

python - 正则表达式 - 跟踪号码 2018

Python从文件中打印随机行而不重复

Python - 脚本在守护进程后不执行其余代码

apache-spark - Hive 元存储中的上次访问时间更新

python - bash Shell 脚本: how to read a file line by line and pass to a variable

python - MySQLdb执行超时

python - 由 : java. io.IOException : error=13, Permission denied 引起