pyspark - 将 rest api get 方法响应保存为 json 文档

标签 pyspark azure-databricks azure-data-lake-gen2

我正在使用下面的代码从 rest api 读取并将响应写入 pyspark 中的 json 文档,并将文件保存到 Azure Data Lake Gen2。当响应没有空白数据时代码工作正常,但当我尝试取回所有数据时遇到以下错误。

错误信息:ValueError:推断后无法确定某些类型

代码:

import requests
response = requests.get('https://apiurl.com/demo/api/v3/data',
                         auth=('user', 'password'))
data = response.json()
from pyspark.sql import *
df=spark.createDataFrame([Row(**i) for i in data])
df.show()
df.write.mode("overwrite").json("wasbs://<file_system>@<storage-account-name>.blob.core.windows.net/demo/data")

响应:

[
    {
        "ProductID": "156528",
        "ProductType": "Home Improvement",
        "Description": "",
        "SaleDate": "0001-01-01T00:00:00",
        "UpdateDate": "2015-02-01T16:43:18.247"
    },
    {
        "ProductID": "126789",
        "ProductType": "Pharmacy",
        "Description": "",
        "SaleDate": "0001-01-01T00:00:00",
        "UpdateDate": "2015-02-01T16:43:18.247"
    }
]

尝试修复如下架构。

from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("ProductID", StringType(), True), StructField("ProductType", StringType(), True), "Description", StringType(), True), StructField("SaleDate", StringType(), True), StructField("UpdateDate", StringType(), True)])
df = spark.createDataFrame([[None, None, None, None, None]], schema=schema)
df.show()

不确定如何创建数据框并将数据写入 json 文档。

最佳答案

您可以将dataschema 变量传递给spark.createDataFrame() 然后spark 将创建一个数据框。

示例:

from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import *


data=[
    {
        "ProductID": "156528",
        "ProductType": "Home Improvement",
        "Description": "",
        "SaleDate": "0001-01-01T00:00:00",
        "UpdateDate": "2015-02-01T16:43:18.247"
    },
    {
        "ProductID": "126789",
        "ProductType": "Pharmacy",
        "Description": "",
        "SaleDate": "0001-01-01T00:00:00",
        "UpdateDate": "2015-02-01T16:43:18.247"
    }
]

schema = StructType([StructField("ProductID", StringType(), True), StructField("ProductType", StringType(), True), StructField("Description", StringType(), True), StructField("SaleDate", StringType(), True), StructField("UpdateDate", StringType(), True)])


df = spark.createDataFrame(data, schema=schema)

df.show()
#+---------+----------------+-----------+-------------------+--------------------+
#|ProductID|     ProductType|Description|           SaleDate|          UpdateDate|
#+---------+----------------+-----------+-------------------+--------------------+
#|   156528|Home Improvement|           |0001-01-01T00:00:00|2015-02-01T16:43:...|
#|   126789|        Pharmacy|           |0001-01-01T00:00:00|2015-02-01T16:43:...|
#+---------+----------------+-----------+-------------------+--------------------+

关于pyspark - 将 rest api get 方法响应保存为 json 文档,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63493128/

相关文章:

python - 如何在 PySpark 中将字典转换为数据框?

azure - 如何从 Azure Databricks 将 JSON 写入 Azure 队列

azure - 对于高级、低延迟、带有搜索功能的大量小型 json 文件,Azure BlockBlobStorage 还是通用 v2 更好?

powerbi - 使用 SAS token Power BI 连接到 Azure Data Lake Storage Gen 2

apache-spark - sc.parallelize 和 sc.textFile 有什么区别?

apache-spark - 如果某些值为 null,则在 SUM 中返回 null

azure-data-factory - 独立于 git 用户名运行通过 ADF 连接到 git 的 databricks 笔记本

databricks - Azure Databricks 群集问题

Azure 存储帐户在 Data Lake Gen2 验证中停留在 0%

python - 当新数据到来时,如何重新训练 pyspark 中保存的线性回归 ML 模型