python - PySpark 在每个 DataFrame 行上执行普通 Python 函数

标签 python dataframe apache-spark pyspark apache-spark-sql

我有包含数百万行的 Spark DataFrame DF1。每行最多有 100 列。

col1 | col2 | col3 | ... | colN
--------------------------------
v11  | v12  | v13  | ... | v1N
v21  | v22  | v23  | ... | v2N
...  | ...  | ...  | ... | ...

此外,我还有另一个 DataFrame DF2,其中有数百行,其中包含名称和正文列。名称包含函数名称,主体包含纯 Python 代码,返回 true 或 false 的 bool 函数。这些函数在其逻辑内部可以引用 DF1 中单行中的任何列。

func_name | func_body
-----------------------------------------------
func1     |   col2 < col45
func2     |   col11.contains("London") and col32*col15 < col21
funcN     |   .... 

我需要将这两个 DataFrame - DF1 与 DF2 连接起来,并将 Df2 中的每个函数应用到 DF1 中的每一行。每个函数必须能够接受来自 DF1 的参数,假设带有键/值对的字典数组,这些键/值对表示 DF1 中相应行的所有列的名称/值。

我知道如何连接 DF1 和 DF2,而且我知道 Python 函数的执行无法以分布式方式工作。现在就这样好了。这是一个暂时的解决方案。我只需要将 DF1 中的所有行分配到工作节点上,并将每个 Python 函数应用于 Apache Spark 应用程序的不同任务中的 DF1 的每一行。评估 eval() 它们并传递带有键/值对的字典数组,正如我上面提到的。

一般来说,每个Python函数都是一个标签,我想将其分配给DF1中的行,以防某些函数返回true。例如,这是生成的 DataFrame DF3:

col1 | col2 | col3 | ... | colN | tags
--------------------------------------
v11  | v12  | v13  | ... | v1N  | [func1, func76, funcN]
v21  | v22  | v23  | ... | v2N  | [func32]
...  | ...  | ...  | ... | ...  | [..., ..., ..., ..., ...]

PySpark 是否可行?如果可以,您能否举例说明如何实现?以 DF.columns 中的 Map 作为输入参数的 UDF 函数是正确的方法还是可以以更简单的方式完成? Spark对于某一时刻可以注册多少UDF函数(数量)有限制吗?

最佳答案

您可以使用 SQL 表达式来实现这一点,该表达式可以使用 expr 进行评估。但是,您将无法连接 2 个 DataFrame,因为 SQL 表达式无法计算为列值(请参阅此 post ),因此您必须将函数收集到一个列表中(因为您只有数百行,它可以适合内存)。

这是一个可以根据您的要求进行调整的工作示例:

data1 = [(1, "val1", 4, 5, "A", 10), (0, "val2", 7, 8, "B", 20),
         (9, "val3", 8, 1, "C", 30), (10, "val4", 2, 9, "D", 30),
         (20, "val5", 6, 5, "E", 50), (3, "val6", 100, 2, "X", 45)]

df1 = spark.createDataFrame(data1, ["col1", "col2", "col3", "col4", "col5", "col6"])

data2 = [("func1", "col1 + col3 = 5 and col2 like '%al1'"),
         ("func2", "col6 = 30 or col1 * col4 > 20"),
         ("func3", "col5 in ('A', 'B', 'C') and col6 - col1 < 30"),
         ("func4", "col2 like 'val%' and col1 > 0")]

df2 = spark.createDataFrame(data2, ["func_name", "func_body"])

# get functions into a list
functions = df2.collect()

# case/when expression to evaluate the functions
satisfied_expr = [when(expr(f.func_body), lit(f.func_name)) for f in functions]

# add new column tags
df1.withColumn("tags", array(*satisfied_expr)) \
    .withColumn("tags", expr("filter(tags, x -> x is not null)")) \
    .show(truncate=False)

添加数组列tags后,filter函数用于删除与不满足表达式对应的空值。此功能仅从 Spark 2.4+ 开始可用,对于旧版本,您必须使用 UDF。

给予:

+----+----+----+----+----+----+---------------------+
|col1|col2|col3|col4|col5|col6|tags                 |
+----+----+----+----+----+----+---------------------+
|1   |val1|4   |5   |A   |10  |[func1, func3, func4]|
|0   |val2|7   |8   |B   |20  |[func3]              |
|9   |val3|8   |1   |C   |30  |[func2, func3, func4]|
|10  |val4|2   |9   |D   |30  |[func2, func4]       |
|20  |val5|6   |5   |E   |50  |[func2, func4]       |
|3   |val6|100 |2   |X   |45  |[func4]              |
+----+----+----+----+----+----+---------------------+

关于python - PySpark 在每个 DataFrame 行上执行普通 Python 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60201221/

相关文章:

python - QLineEdit 对象的 PyQt 集合

python - 创建相同 BST 的节点插入序列的数量?

r - 从 R 中的现有数据框中提取数据(或 reshape )数据框

python - 将多索引数据帧写入 csv 而不更改其格式

python - 如何在 emacs python 模式中取消解释器命令

python - 将数组添加到另一个 Python 的末尾

r - 按列名称中的 ID 操作数据框列

azure - 如何直接从 Azure Databricks Notebook 读取 Azure Blob 存储文件

scala - Spark Scala Dataframe 描述非数字列

apache-spark - 启动集群时在 EMR 上配置 Zeppelin 的 Spark 解释器