apache-spark - PySpark - 添加新的嵌套列或更改现有嵌套列的值

标签 apache-spark pyspark

假设,我有一个具有以下结构行的 json 文件:

{
 "a": 1,
 "b": {
       "bb1": 1,
       "bb2": 2
      }
}

我想更改键值 bb1或添加一个新 key ,例如:bb3 .
目前,我使用 spark.read.json 将 json 文件作为 DataFrame 加载到 spark 中,使用 df.rdd.map 将 RDD 的每一行映射到 dict。然后,更改嵌套键值或添加嵌套键并将字典转换为行。最后,将 RDD 转换为 DataFrame。
工作流程如下:
def map_func(row):
  dictionary = row.asDict(True)
  adding new key or changing key value
  return as_row(dictionary) # as_row convert dict to row recursively

df = spark.read.json("json_file")
df.rdd.map(map_func).toDF().write.json("new_json_file")

这对我有用。但我担心转换 DataFrame -> RDD (Row -> dict -> Row) -> DataFrame 会降低效率。
有没有其他方法可以满足这种需求但不以效率为代价?

我使用的最终解决方案是使用 withColumn 并动态构建 b 的架构。
首先,我们可以得到b_schema来自 df 架构:
b_schema = next(field['type'] for field in df.schema.jsonValue()['fields'] if field['name'] == 'b')

之后,b_schema是 dict ,我们可以通过以下方式向其中添加新字段:
b_schema['fields'].append({"metadata":{},"type":"string","name":"bb3","nullable":True})

然后,我们可以通过以下方式将其转换为 StructType:
new_b = StructType.fromJson(b_schema)

在 map_func 中,我们可以将 Row 转换为 dict 并填充新字段:
def map_func(row):
  data = row.asDict(True)
  data['bb3'] = data['bb1'] + data['bb2']
  return data

map_udf = udf(map_func, new_b)
df.withColumn('b', map_udf('b')).collect()

谢谢@Mariusz

最佳答案

您可以使用 map_func作为udf,因此省略了DF -> RDD -> DF的转换,仍然具有python的灵活性来实现业务逻辑。您只需要创建架构对象:

>>> from pyspark.sql.types import *
>>> new_b = StructType([StructField('bb1', LongType()), StructField('bb2', LongType()), StructField('bb3', LongType())])

然后你定义 map_func和 udf:
>>> from pyspark.sql.functions import *
>>> def map_func(data):
...     return {'bb1': 4, 'bb2': 5, 'bb3': 6}
... 
>>> map_udf = udf(map_func, new_b)

最后将此 UDF 应用于数据帧:
>>> df = spark.read.json('sample.json')
>>> df.withColumn('b', map_udf('b')).first()
Row(a=1, b=Row(bb1=4, bb2=5, bb3=6))

编辑 :

根据评论:您可以以更简单的方式向现有 StructType 添加字段,例如:
>>> df = spark.read.json('sample.json')
>>> new_b = df.schema['b'].dataType.add(StructField('bb3', LongType()))

关于apache-spark - PySpark - 添加新的嵌套列或更改现有嵌套列的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42203483/

相关文章:

java - Spark Streaming 单键并行

apache-spark - 如何从 DataFrame apache spark 中找到最大值 Alphabet?

apache-spark - Spark 累加器未显示在 Spark WebUI 中

apache-spark - Spark MLlib 多类逻辑回归中出现“输入验证失败”错误

python-3.x - 无法在elasticsearch-hadoop上设置_id

elasticsearch - 将数据从 spark 保存到 elasticsearch 时出错 - saveToEs

apache-spark - Spark中间文件存储在磁盘的什么位置?

python - 将键/值对的 Pyspark RDD 解析为 .csv 格式

apache-spark - 具有窗口功能的PySpark数据偏度

java - 如何使用 Java 检查从 Spark 结构化流中的 Kafka 获取数据?