python - 使用 Databricks 将 Google Api 的结果写入数据湖

标签 python apache-spark azure-data-lake databricks google-api-python-client

我正在通过 Databricks 上的 Python SDK 从 Google 管理报告用户使用情况 Api 获取用户使用情况数据。数据大小约为每天 100 000 条记录,我通过批处理处理了一晚上。 API 返回的最大页面大小为 1000,因此我粗略地将其称为 1000,以获取当天所需的数据。这工作正常。

我的最终目标是将数据以其原始格式存储在数据湖中(Azure Gen2,但与这个问题无关)。稍后,我将使用 Databricks 将数据转换为聚合报告模型,并将 PowerBI 置于其之上,以跟踪一段时间内 Google 应用的使用情况。

作为一名 C# 程序员,我对 Python 和 Spark 很陌生:我目前的做法是从 api 请求第一页 1000 条记录,然后将其作为 JSON 文件直接写入 datalake,然后获取下一页集并也写这个。文件夹结构类似于“\raw\googleuser\YYYY\MM\DD\data1.json”。

我想在原始区域中保留数据的最原始形式,并且不应用太多转换。第二个进程可以提取我需要的字段,用元数据对其进行标记,并将其写回 Parquet,以供函数使用。这就是为什么我考虑将其写为 JSON。

这意味着第二个进程需要将 JSON 读入数据帧,我可以在其中对其进行转换并将其写入 Parquet (这部分也很简单)。

因为我使用的是 Google Api,所以我不使用 Json - 它返回 dict 对象(具有复杂的嵌套)。我可以使用 json.dump() 将其提取为 Json 字符串,但我无法弄清楚如何将字符串直接写入我的数据湖。一旦我将它放入数据帧中,我就可以轻松地以任何格式写入它,但是将其从 Json 转换为数据帧,然后本质上返回 Json 来写入它似乎是一种性能开销。

以下是我尝试过的方法和结果:

  1. 构建 pyspark.sql.Rows 列表,并在所有分页(100k 行)结束时 - 使用spark.createDataFrame(rows) 将其转换为数据帧。一旦它是一个数据框,我就可以将它保存为 Json 文件。这可行,但似乎效率低下。
  2. 使用 json.dump(request) 获取一串 Json 格式的 1000 条记录。我可以使用以下代码将其写入 Databricks 文件系统:

    with open("/dbfs/tmp/googleuserusagejsonoutput-{0}.json".format(keyDateFilter), 'w') as f: f.write(json.dumps(response))

    但是,我必须将其移动到我的 Azure 数据湖:

    dbutils.fs.cp("/tmp/test_dbfs1.txt", datalake_path + dbfs_path + "xyz.json")

    然后我获取接下来的 1000 条记录并继续这样做。我似乎无法使用数据湖存储(Azure abfss 驱动程序)的 open() 方法目录,否则这将是一个不错的解决方案。先将其转储到本地然后再移动它,这似乎很脆弱且奇怪。

  3. 与选项 1 相同,但每 1000 条记录将数据帧转储到数据湖并覆盖它(以便内存一次增加的记录不会超过 1000 条记录)

  4. 忽略转储原始 Json 的规则。将数据调整为我想要的最简单的格式,并删除所有我不需要的额外数据。这将导致占用空间小得多,然后将遵循上面的选项 1 或 3。 (这是第二个问题 - 以原始格式保存来自 Api 的所有数据的原则,以便随着需求随着时间的推移而变化,我总是在数据湖中拥有历史数据,并且只需更改转换例程即可从其中提取不同的指标它。因此我不愿意在这个阶段放弃任何数据。

如有任何建议,请表示感谢...

最佳答案

将 Lake 安装到您的 databricks 环境中,这样您就可以将其保存到 Lake,就像它是普通文件夹一样:

with open('/dbfs/mnt/mydatalake/googleuserusagejsonoutput-{0}.json', 'wb') as f:
            json.dump(data, codecs.getwriter('utf-8')(f), sort_keys = True, indent = 4, ensure_ascii=False)
            f.close()

您只需登上湖泊一次:

https://docs.databricks.com/spark/latest/data-sources/azure/azure-datalake-gen2.html#mount-the-azure-data-lake-storage-gen2-filesystem-with-dbfs

话虽如此,

以json格式存储大数据并不是最优的;对于每个值(单元格),您都存储键(列名称),因此您的数据将比需要的大得多。此外,您可能应该有一个重复数据删除功能来确保:(1) 数据中没有间隙,(2) 您不会在多个文件中存储相同的数据。 Databricks delta 负责处理这个问题。

https://docs.databricks.com/delta/delta-intro.html

关于python - 使用 Databricks 将 Google Api 的结果写入数据湖,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55628005/

相关文章:

javascript - 如何在内联字段 Django 中添加属性?

python - 如何删除 django 中级联的一对一关联模型?

maven - 将 Spark 源代码导入 intellij,构建错误 : not found: type SparkFlumeProtocol and EventBatch

java - 使用 Statistic.stat 时如何避免收集

json - 用于 JSON 转换的 U-SQL 脚本

python - 递归函数返回值保留以前运行的结果

python - 执行内容协商的模拟 HTTP 服务器

jdbc - 将 JDBC 驱动程序添加到 EMR 上的 Spark

python-3.x - 获取 Azure Databricks 中文件夹和文件的上次修改日期

json - 将 JSON 数据从 DocumentDB(或 CosmosDB)移动到 Azure Data Lake