考虑以下数据帧:
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[john, sam, jane] |
|pet |[whiskers, rover, fido]|
+------+-----------------------+
可以使用以下代码创建:
import pyspark.sql.functions as f
data = [
('person', ['john', 'sam', 'jane']),
('pet', ['whiskers', 'rover', 'fido'])
]
df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)
有没有办法直接修改
ArrayType()
栏目 "names"
通过对每个元素应用一个函数,而不使用 udf
?例如,假设我想应用函数
foo
到 "names"
柱子。 (我将使用 foo
是 str.upper
的示例仅用于说明目的,但我的问题是关于可应用于可迭代元素的任何有效函数。)foo = lambda x: x.upper() # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()
TypeError: Column is not iterable
我可以使用
udf
来做到这一点:foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
#+------+-----------------------+
#|type |names |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE] |
#|pet |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
在这个特定的例子中,我可以避免
udf
通过爆炸列,调用 pyspark.sql.functions.upper()
,然后 groupBy
和 collect_list
:df.select('type', f.explode('names').alias('name'))\
.withColumn('name', f.upper(f.col('name')))\
.groupBy('type')\
.agg(f.collect_list('name').alias('names'))\
.show(truncate=False)
#+------+-----------------------+
#|type |names |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE] |
#|pet |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
但这是很多代码来做一些简单的事情。是否有更直接的方法来迭代
ArrayType()
的元素?使用 spark-dataframe 函数?
最佳答案
在 Spark < 2.4 您可以使用用户定义的函数:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DataType, StringType
def transform(f, t=StringType()):
if not isinstance(t, DataType):
raise TypeError("Invalid type {}".format(type(t)))
@udf(ArrayType(t))
def _(xs):
if xs is not None:
return [f(x) for x in xs]
return _
foo_udf = transform(str.upper)
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
考虑到
explode
的高成本+ collect_list
习惯上,这种方法几乎完全是首选,尽管它有内在成本。在 Spark 2.4 或以后您可以使用
transform
* 与 upper
(见 SPARK-23909):from pyspark.sql.functions import expr
df.withColumn(
'names', expr('transform(names, x -> upper(x))')
).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
也可以使用
pandas_udf
from pyspark.sql.functions import pandas_udf, PandasUDFType
def transform_pandas(f, t=StringType()):
if not isinstance(t, DataType):
raise TypeError("Invalid type {}".format(type(t)))
@pandas_udf(ArrayType(t), PandasUDFType.SCALAR)
def _(xs):
return xs.apply(lambda xs: [f(x) for x in xs] if xs is not None else xs)
return _
foo_udf_pandas = transform_pandas(str.upper)
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
虽然只有最新的 Arrow/PySpark 组合支持处理
ArrayType
列( SPARK-24259 , SPARK-21187 )。尽管如此,在支持任意 Python 函数的同时,此选项应该比标准 UDF 更有效(尤其是具有较低的 serde 开销)。* A number of other higher order functions are also supported ,包括但不限于
filter
和 aggregate
.见例如关于apache-spark - 类型错误 : Column is not iterable - How to iterate over ArrayType()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48993439/