python - 作为 newAPIHadoopRDD 加载的数据可以转换为 DataFrame 吗?

标签 python apache-spark google-bigquery pyspark

我正在使用 PySpark 从 Google BigQuery 加载数据。

我已使用以下方式加载数据:

dfRates = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

其中conf定义为https://cloud.google.com/hadoop/examples/bigquery-connector-spark-example .

我需要这些数据作为 DataFrame,所以我尝试了,

row = Row(['userId','accoId','rating']) # or row = Row(('userId','accoId','rating'))
dataRDD = dfRates.map(row).toDF()

dataRDD = sqlContext.createDataFrame(dfRates,['userId','accoId','rating'])

但它不会将数据转换为 DataFrame。有没有办法将其转换为DataFrame?

最佳答案

只要类型可以使用 Spark SQL 类型表示,就没有理由不能。这里唯一的问题似乎是你的代码。

newAPIHadoopRDD 返回一个 RDD 对(长度等于 2 的元组)。在这个特定的上下文中,看起来你会在 Python 中得到 (int, str) ,它显然无法解压到 ['userId','accoId',' rating'] .

根据您链接的文档,com.google.gson.JsonObject 表示为 JSON 字符串,可以使用标准 Python utils 在 Python 端进行解析 (json模块):

def parse(v, fields=["userId", "accoId", "rating"]):
    row = Row(*fields)
    try:
        parsed = json.loads(v)
    except json.JSONDecodeError:
        parsed = {}
    return row(*[parsed.get(x) for x in fields])

dfRates.map(parse).toDF()

或者在 Scala/DataFrame 端使用 get_json_object:

from pyspark.sql.functions import col, get_json_object

dfRates.toDF(["id", "json_string"]).select(
  # This assumes you expect userId field
  get_json_object(col("json_string"), "$.userId"), 
  ...
)

请注意我用来定义和创建行的语法的差异。

关于python - 作为 newAPIHadoopRDD 加载的数据可以转换为 DataFrame 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36226294/

相关文章:

python - 比较数组,找到相同元素并返回索引

python - 读取并打印字符串 x 次

java - SQLcontext 将字符串字段更改为 Long : Spark 1. 5

google-bigquery - 提取 BigQuery 分区表

python - python中异步任务和同步线程之间的通信

python - 如何在不使用Python循环的情况下创建引用数据框和字典的当前列的条件列?

python - 如何spark-submit存储在GCP存储桶中的.py文件?

apache-spark - Pyspark 加入然后列选择显示意外输出

google-bigquery - BigQuery : Specify a condition in count()

mysql - Apache Airflow - MySQL 到 BigQuery - 如何获取上个月的数据?