airflow - 使用 Pub/Sub 消息触发 Cloud Composer DAG

标签 airflow google-cloud-pubsub directed-acyclic-graphs google-cloud-composer

我正在尝试创建一个通过发布/订阅消息触发的 Cloud Composer DAG。 Google 提供了以下示例,每当 Cloud Storage 存储桶中发生更改时都会触发 DAG: https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf

但是,一开始他们说您可以触发 DAG 以响应事件,例如 Cloud Storage 存储桶中的更改或推送到 Cloud Pub/Sub 的消息。我花了很多时间试图找出如何做到这一点,但没有结果。

你能帮忙或给我一些指示吗?谢谢!

最佳答案

有两种方法可以通过 Pub/Sub 事件触发 DAG。

  1. 您可以放置​​一个 PubSubPullSensor在你的 DAG 的开头。每次 PubSubPullSensor 可以拉取 Pub/Sub 消息时,您的 DAG 都会被触发。它将执行您的 DAG 中的其余任务。
  2. 您还可以创建一个 Cloud Function that acts as Pub/Sub trigger .并把 Composer DAG triggering logic在 Cloud Function 触发器中。当消息发布到 Pub/Sub 主题时,Cloud Function 应该能够触发 Composer DAG。

关于airflow - 使用 Pub/Sub 消息触发 Cloud Composer DAG,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58551125/

相关文章:

google-cloud-platform - gcloud-java pubsub API : how to set "Return Immediately" flag

database-design - 在关系数据库中可以有循环外键依赖吗?

Airflow 弃用警告传递了无效参数

python - Airflow 异常: DataFlow failed with return code 1

python - 运行简单的 Airflow BashOperator 时出现 TemplateNotFound 错误

algorithm - 如何找到 DAG 中两个顶点之间的最大权重路径?

java - 检测有向无环图中的循环引用

Airflow SFTPHook - 找不到主机的主机 key

amazon-web-services - 使用 Google Cloud Pub/Sub 时如何在 AWS/SQS 中实现 "locked"功能?

django - 如何在 Google Cloud App Engine 上使用 PubSub 创建订阅者,通过 Publisher 从 Google Cloud App Engine Flex 收听消息?