我有包含数百万行的 Spark DataFrame DF1。每行最多有 100 列。
col1 | col2 | col3 | ... | colN
--------------------------------
v11 | v12 | v13 | ... | v1N
v21 | v22 | v23 | ... | v2N
... | ... | ... | ... | ...
此外,我还有另一个 DataFrame DF2,其中有数百行,其中包含名称和正文列。名称包含函数名称,主体包含纯 Python 代码,返回 true 或 false 的 bool 函数。这些函数在其逻辑内部可以引用 DF1 中单行中的任何列。
func_name | func_body
-----------------------------------------------
func1 | col2 < col45
func2 | col11.contains("London") and col32*col15 < col21
funcN | ....
我需要将这两个 DataFrame - DF1 与 DF2 连接起来,并将 Df2 中的每个函数应用到 DF1 中的每一行。每个函数必须能够接受来自 DF1 的参数,假设带有键/值对的字典数组,这些键/值对表示 DF1 中相应行的所有列的名称/值。
我知道如何连接 DF1 和 DF2,而且我知道 Python 函数的执行无法以分布式方式工作。现在就这样好了。这是一个暂时的解决方案。我只需要将 DF1 中的所有行分配到工作节点上,并将每个 Python 函数应用于 Apache Spark 应用程序的不同任务中的 DF1 的每一行。评估 eval()
它们并传递带有键/值对的字典数组,正如我上面提到的。
一般来说,每个Python函数都是一个标签,我想将其分配给DF1中的行,以防某些函数返回true。例如,这是生成的 DataFrame DF3:
col1 | col2 | col3 | ... | colN | tags
--------------------------------------
v11 | v12 | v13 | ... | v1N | [func1, func76, funcN]
v21 | v22 | v23 | ... | v2N | [func32]
... | ... | ... | ... | ... | [..., ..., ..., ..., ...]
PySpark 是否可行?如果可以,您能否举例说明如何实现?以 DF.columns
中的 Map
作为输入参数的 UDF 函数是正确的方法还是可以以更简单的方式完成? Spark对于某一时刻可以注册多少UDF函数(数量)有限制吗?
最佳答案
您可以使用 SQL 表达式来实现这一点,该表达式可以使用 expr
进行评估。但是,您将无法连接 2 个 DataFrame,因为 SQL 表达式无法计算为列值(请参阅此 post ),因此您必须将函数收集到一个列表中(因为您只有数百行,它可以适合内存)。
这是一个可以根据您的要求进行调整的工作示例:
data1 = [(1, "val1", 4, 5, "A", 10), (0, "val2", 7, 8, "B", 20),
(9, "val3", 8, 1, "C", 30), (10, "val4", 2, 9, "D", 30),
(20, "val5", 6, 5, "E", 50), (3, "val6", 100, 2, "X", 45)]
df1 = spark.createDataFrame(data1, ["col1", "col2", "col3", "col4", "col5", "col6"])
data2 = [("func1", "col1 + col3 = 5 and col2 like '%al1'"),
("func2", "col6 = 30 or col1 * col4 > 20"),
("func3", "col5 in ('A', 'B', 'C') and col6 - col1 < 30"),
("func4", "col2 like 'val%' and col1 > 0")]
df2 = spark.createDataFrame(data2, ["func_name", "func_body"])
# get functions into a list
functions = df2.collect()
# case/when expression to evaluate the functions
satisfied_expr = [when(expr(f.func_body), lit(f.func_name)) for f in functions]
# add new column tags
df1.withColumn("tags", array(*satisfied_expr)) \
.withColumn("tags", expr("filter(tags, x -> x is not null)")) \
.show(truncate=False)
添加数组列tags
后,filter
函数用于删除与不满足表达式对应的空值。此功能仅从 Spark 2.4+ 开始可用,对于旧版本,您必须使用 UDF。
给予:
+----+----+----+----+----+----+---------------------+
|col1|col2|col3|col4|col5|col6|tags |
+----+----+----+----+----+----+---------------------+
|1 |val1|4 |5 |A |10 |[func1, func3, func4]|
|0 |val2|7 |8 |B |20 |[func3] |
|9 |val3|8 |1 |C |30 |[func2, func3, func4]|
|10 |val4|2 |9 |D |30 |[func2, func4] |
|20 |val5|6 |5 |E |50 |[func2, func4] |
|3 |val6|100 |2 |X |45 |[func4] |
+----+----+----+----+----+----+---------------------+
关于python - PySpark 在每个 DataFrame 行上执行普通 Python 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60201221/