apache-spark - 如何将列添加到 pyspark 数据框中的嵌套结构中?

标签 apache-spark pyspark apache-spark-sql

我有一个类似架构的数据框

root
 |-- state: struct (nullable = true)
 |    |-- fld: integer (nullable = true)

我想在 state 中添加列struct,即创建一个具有类似架构的数据框
root
 |-- state: struct (nullable = true)
 |    |-- fld: integer (nullable = true)
 |    |-- a: integer (nullable = true)

但相反,我得到
root
 |-- state: struct (nullable = true)
 |    |-- fld: integer (nullable = true)
 |-- state.a: integer (nullable = true)

这是来自尝试
df.withColumn('state.a', val)

最佳答案

这是一种无需使用 udf 即可完成的方法:

# create example dataframe
import pyspark.sql.functions as f
data = [
    ({'fld': 0},)
]

schema = StructType(
    [
        StructField('state',
            StructType(
                [StructField('fld', IntegerType())]
            )
        )
    ]
)

df = sqlCtx.createDataFrame(data, schema)
df.printSchema()
#root
# |-- state: struct (nullable = true)
# |    |-- fld: integer (nullable = true)

现在使用 withColumn()并使用 lit() 添加新字段和 alias() .

val = 1
df_new = df.withColumn(
    'state', 
    f.struct(*[f.col('state')['fld'].alias('fld'), f.lit(val).alias('a')])
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# |    |-- fld: integer (nullable = true)
# |    |-- a: integer (nullable = false)

如果嵌套结构中有很多字段,则可以使用列表推导式,使用 df.schema["state"].dataType.names获取字段名称。例如:

val = 1
s_fields = df.schema["state"].dataType.names # ['fld']
df_new = df.withColumn(
    'state', 
    f.struct(*([f.col('state')[c].alias(c) for c in s_fields] + [f.lit(val).alias('a')]))
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# |    |-- fld: integer (nullable = true)
# |    |-- a: integer (nullable = false)

引用文献
  • 我找到了一种从结构中获取字段名称的方法,而无需从 this answer 手动命名它们.
  • 关于apache-spark - 如何将列添加到 pyspark 数据框中的嵌套结构中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48777993/

    相关文章:

    python - pyspark如何与java集成?

    java - 如何创建具有 Hive 支持的 SparkSession(失败并返回 "Hive classes are not found")?

    PySpark,GraphFrames,异常引起: java. lang.ClassNotFoundException : com. typesafe.scalalogging.slf4j.LazyLogging

    python - 带有额外参数的子类 python 类继承

    apache-spark - 如何将from_json与架构作为字符串(即JSON编码的架构)一起使用?

    scala - 如何在 Intellij IDEA 上调试基于 Scala 的 Spark 程序

    apache-spark - 如何在 Spark MLlib 中为 K-means 初始化聚类中心?

    apache-spark - 使用 Spark 从 DynamoDB JSON 字符串中提取嵌套的 Json 字段?

    apache-spark - 在PySpark中用空数组过滤行

    apache-spark - 如何在pyspark中将行转换为字典列表?