apache-spark - 类型错误 : Column is not iterable - How to iterate over ArrayType()?

标签 apache-spark pyspark spark-dataframe pyspark-sql

考虑以下数据帧:

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[john, sam, jane]      |
|pet   |[whiskers, rover, fido]|
+------+-----------------------+

可以使用以下代码创建:

import pyspark.sql.functions as f
data = [
    ('person', ['john', 'sam', 'jane']),
    ('pet', ['whiskers', 'rover', 'fido'])
]

df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)

有没有办法直接修改ArrayType()栏目 "names"通过对每个元素应用一个函数,而不使用 udf ?

例如,假设我想应用函数 foo"names"柱子。 (我将使用 foostr.upper 的示例仅用于说明目的,但我的问题是关于可应用于可迭代元素的任何有效函数。)

foo = lambda x: x.upper()  # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()

TypeError: Column is not iterable



我可以使用 udf 来做到这一点:

foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
#+------+-----------------------+
#|type  |names                  |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE]      |
#|pet   |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+

在这个特定的例子中,我可以避免 udf通过爆炸列,调用 pyspark.sql.functions.upper() ,然后 groupBycollect_list :

df.select('type', f.explode('names').alias('name'))\
    .withColumn('name', f.upper(f.col('name')))\
    .groupBy('type')\
    .agg(f.collect_list('name').alias('names'))\
    .show(truncate=False)
#+------+-----------------------+
#|type  |names                  |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE]      |
#|pet   |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+

但这是很多代码来做一些简单的事情。是否有更直接的方法来迭代 ArrayType() 的元素?使用 spark-dataframe 函数?

最佳答案

Spark < 2.4 您可以使用用户定义的函数:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DataType, StringType

def transform(f, t=StringType()):
    if not isinstance(t, DataType):
       raise TypeError("Invalid type {}".format(type(t)))
    @udf(ArrayType(t))
    def _(xs):
        if xs is not None:
            return [f(x) for x in xs]
    return _

foo_udf = transform(str.upper)

df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

考虑到 explode 的高成本+ collect_list 习惯上,这种方法几乎完全是首选,尽管它有内在成本。

Spark 2.4 或以后您可以使用 transform * 与 upper (见 SPARK-23909):

from pyspark.sql.functions import expr

df.withColumn(
    'names', expr('transform(names, x -> upper(x))')
).show(truncate=False)

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

也可以使用 pandas_udf
from pyspark.sql.functions import pandas_udf, PandasUDFType

def transform_pandas(f, t=StringType()):
    if not isinstance(t, DataType):
       raise TypeError("Invalid type {}".format(type(t)))
    @pandas_udf(ArrayType(t), PandasUDFType.SCALAR)
    def _(xs):
        return xs.apply(lambda xs: [f(x) for x in xs] if xs is not None else xs)
    return _

foo_udf_pandas = transform_pandas(str.upper)

df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

虽然只有最新的 Arrow/PySpark 组合支持处理 ArrayType列( SPARK-24259SPARK-21187 )。尽管如此,在支持任意 Python 函数的同时,此选项应该比标准 UDF 更有效(尤其是具有较低的 serde 开销)。

* A number of other higher order functions are also supported ,包括但不限于 filter aggregate .见例如
  • Querying Spark SQL DataFrame with complex types
  • How to slice and sum elements of array column?
  • Filter array column content
  • Spark Scala row-wise average by handling null .
  • How to use transform higher-order function? .
  • 关于apache-spark - 类型错误 : Column is not iterable - How to iterate over ArrayType()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48993439/

    相关文章:

    python - 如何使用 Spark (pyspark) 编写 Parquet 文件?

    apache-spark - 使用 DataFrame 按组划分的 Python Spark 累积总和

    java - 如何为给定列添加行和值?

    azure - Hadoop API使用pyspark下载文件

    python - 如何在 Zeppelin 中的 %pyspark 解释器和 %python 解释器之间传递数据集?

    apache-spark - 如何爆炸列?

    apache-spark - Spark的Row和InternalRow类型之间的区别

    sql-server - 将数据从 MS SQL 表加载到 snappyData

    apache-spark - "TypeError: an integer is required (got type bytes)"在 Python 3.8 上导入 pyspark 时

    apache-spark - SparkConf 不读取 spark-submit 参数