python - Airflow DAG - 如何先检查 BQ(必要时删除)然后运行数据流作业?

标签 python google-cloud-platform google-bigquery airflow google-cloud-composer

我正在使用 Cloud Composer 为到达 GCS 并转到 BigQuery 的文件编排 ETL。我有一个云函数,它在文件到达时触发 dag,并且云函数将文件名/位置传递给 DAG。在我的 DAG 中,我有 2 个任务:

1) 使用 DataflowPythonOperator 运行数据流作业,该作业从 GCS 中的文本读取数据并将其转换并输入到 BQ,以及 2) 根据作业是失败还是成功将文件移动到失败/成功存储桶。 每个文件都有一个文件 ID,它是 bigquery 表中的一列。有时一个文件会被编辑一两次(这不是经常发生的流式传输),我希望能够先删除该文件的现有记录。

我查看了其他 Airflow 运算符,但想在运行数据流作业之前在我的 DAG 中执行 2 个任务:

  1. 根据文件名获取文件 ID(现在我有一个 bigquery 表映射文件名 -> 文件 ID,但我也可以只引入一个 json 作为映射,我猜如果那样更容易的话)
  2. 如果文件 ID 已经存在于 bigquery 表(从数据流作业输出转换后的数据的表)中,请将其删除,然后运行数据流作业这样我就有了最新的信息.我知道一个选择是只添加一个时间戳并且只使用最新的记录,但是因为每个文件可能有 100 万条记录而且它不像我每天删除 100 个文件(可能是 1-2 个顶部)看起来它可能是困惑和困惑的。

在数据流作业之后,最好是在将文件移动到成功/失败文件夹之前,我想附加一些“记录”表,说明这个游戏是在此时输入的。这将是我查看发生的所有插入的方式。 我试图寻找不同的方法来做到这一点,我是 cloud composer 的新手,所以在经过 10 多个小时的研究后,我并不清楚这将如何工作,否则我会发布输入代码。

谢谢,我非常感谢大家的帮助,如果这不是你想要的那么清楚,我深表歉意,关于 Airflow 的文档非常强大,但考虑到云 Composer 和 bigquery 相对较新,很难彻底了解如何执行一些 GCP 特定任务。

最佳答案

听起来有点复杂。令人高兴的是,几乎所有 GCP 服务都有运营商。另一件事是什么时候触发 DAG 执行。你想出来了吗?每次有新文件进入该 GCS 存储桶时,您都希望触发 Google Cloud Functions 运行。

  1. 触发您的 DAG

要触发 DAG,您需要使用依赖于 Object Finalize 的 Google 云函数来调用它或 Metadata Update触发器。

  1. 将数据加载到 BigQuery

如果您的文件已经在 GCS 中,并且是 JSON 或 CSV 格式,那么使用数据流作业就有点过分了。您可以使用 GoogleCloudStorageToBigQueryOperator将文件加载到 BQ。

  1. 跟踪文件 ID

计算文件 ID 的最佳方法可能是使用 Airflow 中的 Bash 或 Python 运算符。你能直接从文件名中推导出来吗?

如果是这样,那么您可以拥有一个位于 GoogleCloudStorageObjectSensor 上游的 Python 运算符检查文件是否在成功目录下。

如果是,那么您可以使用 BigQueryOperator在 BQ 上运行删除查询。

之后,您运行 GoogleCloudStorageToBigQueryOperator。

  1. 四处移动文件

如果您要将文件从 GCS 移动到 GCS 位置,则 GoogleCloudStorageToGoogleCloudStorageOperator应该做你需要的把戏。如果您的 BQ 加载运算符失败,则移动到失败的文件位置,如果成功,则移动到成功的作业位置。

  1. 记录任务日志

也许您跟踪插入所需要做的就是将任务信息记录到 GCS。查看how to log task information to GCS

这有帮助吗?

关于python - Airflow DAG - 如何先检查 BQ(必要时删除)然后运行数据流作业?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54114363/

相关文章:

python - 如何导入图片并将其放在 pygame 窗口中

c# - Python .Net 未加载 .Net Standard 2.0 dll

categorical-data - 指定 patsy/statsmodels 生成的类别的名称形式 'C'

java - 通过 Gmail API Google App Engine 发送电子邮件

google-bigquery - 如何对 BigQuery 中的重复字段进行分组

SQL 循环从不同的开始和结束日期生成许多行

Python:float(2**53+3) 是什么

google-cloud-platform - googleapi : Error 503: Policy checks are unavailable. , 后端错误

google-cloud-platform - 如何获取 Cloud Build 的最新更新?

google-bigquery - BigQuery Transfer 与 BigQuery 负载