json - 如何将行合并到 Spark 数据框的列中作为有效的json以将其写入mysql

标签 json python-2.7 apache-spark pyspark apache-spark-sql

我正在尝试将多行合并为一列,作为 spark 数据框 (spark 1.6.1) 中的有效 json 格式。然后我希望它存储在 mysql 表中。

我的原始 Spark 数据框如下:

|user_id   |product_id|price       | 
|A         |p1        |3000        |
|A         |p2        |1500        |
|B         |P1        |3000        |
|B         |P3        |2000        |

我想像这样转换上表:

|user_id   |contents_json 
|A         |{(product_id:p1, price:3000), (product_id:p2, price:1500)} 
|B         |{{product_id:p1, price:3000), (product_id:p3, price:2000)} 

然后把上面的表放到mysql表中。

这是完全相反的爆炸方式,但我找不到正确的方式。

最佳答案

我假设您正在寻找下面显示的 JSON 输出。

from pyspark.sql.functions import col, collect_list, struct

df = sc.parallelize([('A','P1',3000), ('A','P2',1500),
                     ('B','P1',3000), ('B','P3',2000)]).toDF(["user_id", "product_id","price"])

> Spark2.0

df1 = df.\
    groupBy("user_id").agg(collect_list(struct(col("product_id"),col("price"))).alias("contents_json"))
df1.show()

Spark1.6

zipCols = psf.udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      # Adjust types to reflect data types
      StructField("product_id", StringType()),
      StructField("price", IntegerType())
  ]))
)

df1 = df.\
    groupBy("user_id").agg(
        zipCols(
            collect_list(col("product_id")), 
            collect_list(col("price"))
        ).alias("contents_json")
    )

for row in df1.toJSON().collect():
    print row

输出是:

{"user_id":"B","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P3","price":2000}]}
{"user_id":"A","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P2","price":1500}]}

关于json - 如何将行合并到 Spark 数据框的列中作为有效的json以将其写入mysql,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46121507/

相关文章:

javascript - 如何模仿 ASP.NET AjaxOptions 委托(delegate)函数?

java - POST 到 Jersey REST 服务得到错误 415 Unsupported Media Type

python - 更Pythonic的方式来嵌套函数

amazon-web-services - Spark Streaming 使用 S3 与 Kinesis

python - 计算 PySpark 中 Spark 数据帧每列中非 NaN 条目的数量

apache-spark - 从平均序列预测下一个事件

java - 使用 GSON 的 JSON 反序列化会跳过 HashMap 内的 HashMap 中的数据成员

python - 写入和打印产生不同的结果

python - python 中的 decimal.InvalidOperation

python-2.7 - GCP 发布订阅 : Synchronous Pull Subscriber in Python?