我正在使用 PySpark 从 Google BigQuery 加载数据。
我已使用以下方式加载数据:
dfRates = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
其中conf定义为https://cloud.google.com/hadoop/examples/bigquery-connector-spark-example .
我需要这些数据作为 DataFrame,所以我尝试了,
row = Row(['userId','accoId','rating']) # or row = Row(('userId','accoId','rating'))
dataRDD = dfRates.map(row).toDF()
和
dataRDD = sqlContext.createDataFrame(dfRates,['userId','accoId','rating'])
但它不会将数据转换为 DataFrame。有没有办法将其转换为DataFrame?
最佳答案
只要类型可以使用 Spark SQL 类型表示,就没有理由不能。这里唯一的问题似乎是你的代码。
newAPIHadoopRDD
返回一个 RDD 对(长度等于 2 的元组)。在这个特定的上下文中,看起来你会在 Python 中得到 (int, str)
,它显然无法解压到 ['userId','accoId',' rating']
.
根据您链接的文档,com.google.gson.JsonObject
表示为 JSON 字符串,可以使用标准 Python utils 在 Python 端进行解析 (json
模块):
def parse(v, fields=["userId", "accoId", "rating"]):
row = Row(*fields)
try:
parsed = json.loads(v)
except json.JSONDecodeError:
parsed = {}
return row(*[parsed.get(x) for x in fields])
dfRates.map(parse).toDF()
或者在 Scala/DataFrame
端使用 get_json_object
:
from pyspark.sql.functions import col, get_json_object
dfRates.toDF(["id", "json_string"]).select(
# This assumes you expect userId field
get_json_object(col("json_string"), "$.userId"),
...
)
请注意我用来定义和创建行的语法的差异。
关于python - 作为 newAPIHadoopRDD 加载的数据可以转换为 DataFrame 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36226294/