{
"city": "Tempe",
"state": "AZ",
...
"attributes": [
"BikeParking: True",
"BusinessAcceptsBitcoin: False",
"BusinessAcceptsCreditCards: True",
"BusinessParking: {'garage': False, 'street': False, 'validated': False, 'lot': True, 'valet': False}",
"DogsAllowed: False",
"RestaurantsPriceRange2: 2",
"WheelchairAccessible: True"
],
...
}
您好,我正在使用 PySpark,我正在尝试输出一个(状态,BusinessAcceptsBitcoin)的元组,目前我正在做:
csr = (dataset
.filter(lambda e:"city" in e and "BusinessAcceptsBitcoin" in e)
.map(lambda e: (e["city"],e["BusinessAcceptsBitcoin"]))
.collect()
)
但是这个命令失败了。如何获取“BusinessAcceptsBitcoin”和“city”字段?
最佳答案
您可以使用Dataframe和UDF来解析“attributes”字符串。
从您提供的示例数据来看,“属性”似乎不是正确的 JSON 或 Dict。
假设“attributes”只是一个字符串,下面是使用 dataframe 和 Udf 的示例代码。
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("test") \
.getOrCreate()
#sample data
data=[{
"city": "Tempe",
"state": "AZ",
"attributes": [
"BikeParking: True",
"BusinessAcceptsBitcoin: False",
"BusinessAcceptsCreditCards: True",
"BusinessParking: {'garage': False, 'street': False, 'validated': False, 'lot': True, 'valet': False}",
"DogsAllowed: False",
"RestaurantsPriceRange2: 2",
"WheelchairAccessible: True"
]
}]
df=spark.sparkContext.parallelize(data).toDF()
用户定义的函数来解析字符串
def get_attribute(data,attribute):
return [list_item for list_item in data if attribute in list_item][0]
注册udf
udf_get_attribute=udf(get_attribute, StringType
数据框
df.withColumn("BusinessAcceptsBitcoin",udf_get_attribute("attributes",lit("BusinessAcceptsBitcoin"))).select("city","BusinessAcceptsBitcoin").show(truncate=False)
示例输出
+-----+-----------------------------+
|city |BusinessAcceptsBitcoin |
+-----+-----------------------------+
|Tempe|BusinessAcceptsBitcoin: False|
+-----+-----------------------------+
例如,您也可以使用相同的 udf 来查询任何其他字段
df.withColumn("DogsAllowed",udf_get_attribute("attributes",lit("DogsAllowed"))).select("city","DogsAllowed").show(truncate=False)
关于python - PySpark使用RDD和json.load解析Json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48677605/