google-cloud-pubsub - Apache Airflow 与 Google 云 pubsub 的集成

标签 google-cloud-pubsub airflow

我对 Airflow 很陌生,并尝试使用 apache Airflow 与 google pubsub 的集成,我猜它是添加到“Airflow-300”JIRA 下的。如果我在这里阅读不正确,请纠正我。

另外,请问这个已经发布了吗?什么时候发布?我们正在考虑在 Google Cloud Storage 上添加通知,在发生任何文件事件时,我们希望触发 Airflow 中的一些工作流程。

我似乎找不到任何有关如何使用它的文档。

如有任何建议,我们将不胜感激。

最佳答案

Airflow 中的集成已经引入。

发布消息

from base64 import b64encode as b64e

m1 = {'data': b64e('Hello, World!'),
       'attributes': {'type': 'greeting'}
      }
m2 = {'data': b64e('Knock, knock')}
m3 = {'attributes': {'foo': ''}}

t1 = PubSubPublishOperator(
    topic='my_topic',
    messages=[m1, m2, m3],
    create_topic=True,
    dag=dag)

接收消息

PubSubPullSensor(
    task_id='pub_sub_wait', 
    project='my_project',
    subscription='my-subscription',
    ack_messages=True)

引用:

https://github.com/apache/incubator-airflow/commit/d231dce37d753ed196a26d9b244ddf376385de38 https://github.com/apache/incubator-airflow/commit/6645218092096e4b10fc737a62bacc2670e1d6dc

关于google-cloud-pubsub - Apache Airflow 与 Google 云 pubsub 的集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45998331/

相关文章:

google-cloud-platform - 用于监视谷歌云 pubsub 中未传递消息的 REST API

ubuntu - Airflow Worker Daemon 无故退出

python - 即使我的任务实际上已经完成, Airflow 2 仍然出错?错误 - 无法将 XCom 值序列化为 JSON

python - Airflow 快速启动不起作用

gmail-api - 收到gmail users.watch API的推送通知后如何读取用户的gmail?

google-cloud-platform - 如何将 GCP Pubsub 订阅的消息转发到另一个主题?

java - 回复kafka模板连接 header (CorrelationId)未发送到Google pub sub

python - 在 Airflow 中找不到文件

airflow - 如何检查任务 1 是否失败然后在 Airflow 中运行任务 2?

javascript - 如何从 Firebase Cloud Function 在 Google Pub/Sub 中发布消息?