python - 将特定功能应用于 Spark 数据框中的结构化列的有效方法?

标签 python apache-spark dataframe apache-spark-sql

我在 Spark 数据框中有数据,其中有一列 col 包含以下形式的结构化数据:

------ col -------   # Column whose elements are structures
field0  field1 …     # StructType with StructFields (variable names and count)
[1,2,3] [4,5]  [6]   # Each field is of type ArrayType
[1,2]   [3]    []
…

其中字段的数量和名称不固定

计算每行中元素总数的最有效方法是什么?在上面的示例中,预期的结果数据框将是:

num_elements
6
3
…

总是有用户定义函数的解决方案:

from pyspark.sql.types import IntegerType

def num_elements(all_arrays_in_row):
    return sum(map(len, all_arrays_in_row))
num_elements = pyspark.sql.functions.udf(num_elements, IntegerType())

data_frame.select(num_elements(data_frame.col)).show()  # Number of elements in each row

现在,我不确定这是否通常有效,因为:

  1. 函数 num_elements() 在 Python 中。
  2. 如果由于某种原因这些字段碰巧没有存储在一起,map() 会在计算长度之前强制获取每个数组。

更一般地说,“纯”Spark 方法会更有效,但它让我望而却步。到目前为止我尝试的是以下方法,但这比上面的方法更麻烦,而且也不完整:

  1. 使用[field.name for field in data_frame.select("col").schema.fields[0].dataType.fields]
  2. 获取字段名称field0(繁琐)。
  3. 对于每个字段名,有效地计算其数组的大小:

    sizes_one_field = data_frame.select(pyspark.sql.functions.size(
                                        data_frame.col.getField(field_name))
    

现在,我被困在这一点上,因为我不确定如何将 1 列数据帧 sizes_one_field 加在一起(每个字段名称都有一个)。另外,也许有一种方法可以将 size() 函数直接应用于 Spark 中 col 列的每个字段(通过某种映射?)?还是一些完全不同的方法来获取每行中的元素总数?

最佳答案

您可以尝试以下操作:

from pyspark.sql import functions as f

result = df.select(sum((f.size(df[col_name]) for col_name in df.columns), f.lit(0)))

此解决方案使用 pyspark.sql 内置函数,并将以优化的方式执行。有关这些功能的更多信息,您可以查看其pyspark documentation。 .

关于python - 将特定功能应用于 Spark 数据框中的结构化列的有效方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40573280/

相关文章:

python - 单元测试 Django 应用程序错误创建数据库表 OperationalError : (1005, "Can' t 创建表...)

python - Spark 和 Python 尝试使用 gensim 解析维基百科

json - 计算带有 Spark 条件的数据帧的行数

apache-spark - 如何使用 spark-ec2 解决 "Failed to determine hostname of Instance"错误?

r - 从 group1、group2、overlap_count 创建重叠矩阵?

python - 如果使用 groupby 方法满足另一列中的条件,则使用多列进行条件过滤

python - 设置 "in"运算符 : uses equality or identity?

python - ListStore/TreeStore 中的自定义对象

python - 如何对某些列进行 df.groupby(cols).apply(my_func),同时保留一些列未处理?

python - zipfile.badzipfile 即使我没有使用 pandas 读取 zip 文件