python - PySpark使用RDD和json.load解析Json

标签 python json apache-spark pyspark

{

  "city": "Tempe",
  "state": "AZ",
  ...
  "attributes": [
    "BikeParking: True",
    "BusinessAcceptsBitcoin: False",
    "BusinessAcceptsCreditCards: True",
    "BusinessParking: {'garage': False, 'street': False, 'validated': False, 'lot': True, 'valet': False}",
    "DogsAllowed: False",
    "RestaurantsPriceRange2: 2",
    "WheelchairAccessible: True"
  ],
  ...
}

您好,我正在使用 PySpark,我正在尝试输出一个(状态,BusinessAcceptsBitcoin)的元组,目前我正在做:

csr = (dataset
        .filter(lambda e:"city" in e and "BusinessAcceptsBitcoin" in e)
        .map(lambda e: (e["city"],e["BusinessAcceptsBitcoin"]))
        .collect()
        )

但是这个命令失败了。如何获取“BusinessAcceptsBitcoin”和“city”字段?

最佳答案

您可以使用Dataframe和UDF来解析“attributes”字符串。

从您提供的示例数据来看,“属性”似乎不是正确的 JSON 或 Dict。

假设“attributes”只是一个字符串,下面是使用 dataframe 和 Udf 的示例代码。

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
            .builder \
            .appName("test") \
            .getOrCreate()

#sample data
data=[{

  "city": "Tempe",
  "state": "AZ",
  "attributes": [
    "BikeParking: True",
    "BusinessAcceptsBitcoin: False",
    "BusinessAcceptsCreditCards: True",
    "BusinessParking: {'garage': False, 'street': False, 'validated': False, 'lot': True, 'valet': False}",
    "DogsAllowed: False",
    "RestaurantsPriceRange2: 2",
    "WheelchairAccessible: True"
  ]
}]
df=spark.sparkContext.parallelize(data).toDF()

用户定义的函数来解析字符串

def get_attribute(data,attribute):
    return [list_item for list_item in data if attribute in list_item][0]

注册udf

udf_get_attribute=udf(get_attribute, StringType

数据框

df.withColumn("BusinessAcceptsBitcoin",udf_get_attribute("attributes",lit("BusinessAcceptsBitcoin"))).select("city","BusinessAcceptsBitcoin").show(truncate=False)

示例输出

+-----+-----------------------------+
|city |BusinessAcceptsBitcoin       |
+-----+-----------------------------+
|Tempe|BusinessAcceptsBitcoin: False|
+-----+-----------------------------+

例如,您也可以使用相同的 udf 来查询任何其他字段

df.withColumn("DogsAllowed",udf_get_attribute("attributes",lit("DogsAllowed"))).select("city","DogsAllowed").show(truncate=False)

关于python - PySpark使用RDD和json.load解析Json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48677605/

相关文章:

python - 按特定列对矩阵进行排序(具有 2 位或更多数字)

python - 字符串替换不起作用

python - 仅具有查看权限的 Django 模型表单将所有字段排除在外

c# - 如何区分一个 RabbitMQ 队列中的两个 JSON 对象?

json - 如何使用 node.js 访问 json 对象的子元素

hadoop - Apache Hadoop Yarn - 内核利用率不足

python - 在 Python 中,如何将属性同时用作属性和方法/可调用对象?

javascript - 如何将多个信息添加到 JQuery Mobile ListView 中

apache-spark - Spark Graphframes 大数据集和内存问题

azure - 将 DataFrame 从 Azure Databricks 笔记本写入 Azure DataLake Gen2 表