我对 Airflow 很陌生,并尝试使用 apache Airflow 与 google pubsub 的集成,我猜它是添加到“Airflow-300”JIRA 下的。如果我在这里阅读不正确,请纠正我。
另外,请问这个已经发布了吗?什么时候发布?我们正在考虑在 Google Cloud Storage 上添加通知,在发生任何文件事件时,我们希望触发 Airflow 中的一些工作流程。
我似乎找不到任何有关如何使用它的文档。
如有任何建议,我们将不胜感激。
最佳答案
Airflow 中的集成已经引入。
发布消息
from base64 import b64encode as b64e
m1 = {'data': b64e('Hello, World!'),
'attributes': {'type': 'greeting'}
}
m2 = {'data': b64e('Knock, knock')}
m3 = {'attributes': {'foo': ''}}
t1 = PubSubPublishOperator(
topic='my_topic',
messages=[m1, m2, m3],
create_topic=True,
dag=dag)
接收消息
PubSubPullSensor(
task_id='pub_sub_wait',
project='my_project',
subscription='my-subscription',
ack_messages=True)
引用:
https://github.com/apache/incubator-airflow/commit/d231dce37d753ed196a26d9b244ddf376385de38 https://github.com/apache/incubator-airflow/commit/6645218092096e4b10fc737a62bacc2670e1d6dc
关于google-cloud-pubsub - Apache Airflow 与 Google 云 pubsub 的集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45998331/