python - 以 map 类型创建 DataFrame 分组列

标签 python apache-spark dictionary pyspark apache-spark-sql

我的 DataFrame 具有以下结构:

df = spark.createDataFrame(
    [('B', 'a', 10),
     ('B', 'b', 20),
     ('C', 'c', 30)],
    ['Brand', 'Type', 'Amount'])
df.show()
# +-----+----+------+
# |Brand|Type|Amount|
# +-----+----+------+
# |    B|   a|    10|
# |    B|   b|    20|
# |    C|   c|    30|
# +-----+----+------+

我想通过将 typeamount 分组到 type 的单个列中来减少行数:Map。 所以 Brand 将是唯一的,并且 MAP_type_AMOUNT 将为每个 type amount 提供 key,value > 组合。

我想,spark.sql 可能有一些功能可以做到这一点,还是我必须使用 RDD 并“自己”转换为 map 类型?

预期输出:

---------------------------
| Brand | MAP_type_AMOUNT |
---------------------------
|  B    |  {a: 10, b:20}  |
|  C    |  {c: 30}        |
---------------------------

最佳答案

Prem's 略有改进回答(对不起,我还不能发表评论)

使用 func.create_map 而不是 func.struct。参见 documentation

import pyspark.sql.functions as func
df = sc.parallelize([('B','a',10),('B','b',20),
('C','c',30)]).toDF(['Brand','Type','Amount'])

df_converted = df.groupBy("Brand").\
    agg(func.collect_list(func.create_map(func.col("Type"),
    func.col("Amount"))).alias("MAP_type_AMOUNT"))

print df_converted.collect()

输出:

[Row(Brand=u'B', MAP_type_AMOUNT=[{u'a': 10}, {u'b': 20}]),
 Row(Brand=u'C', MAP_type_AMOUNT=[{u'c': 30}])]

关于python - 以 map 类型创建 DataFrame 分组列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45532183/

相关文章:

python - 如何在python中控制系统上是否安装了库

c++ - 我无法声明 map

apache-spark - 如何对多个 Spark 作业并行执行多个 Kafka 主题

sql - 使用 Spark DataFrame 获取一组后所有组的 TopN

scala - 作为参数传递的空参数函数

python - Go 中的字典

swift - 如何用 += 运算符重载连接两个字典

python - 同步函数内的 asyncio.run 返回 None

python - 基于协程的状态机

python - 在 if 语句中为 1 到 50 之间的任何数字制作通配符整数