我是 Airflow 新手。每当 SQS 中有消息时,我都会尝试运行我的 dag。 我正在使用 SQSSensor 来做同样的事情。它在第一次运行时触发,但之后当有新消息时它不会调用。
如果我遗漏了什么,请告诉我。
default_args = {
'owner': 'Airflow',
'start_date': days_ago(2),
'provide_context': True,
}
dag = DAG('sqs_test', default_args=default_args, schedule_interval='@daily')
task = SQSSensor(
task_id='sqs_test',
poke_interval=0,
timeout=10,
sqs_queue='https://sqs.us-east-1.amazonaws.com/accountid/test',
aws_conn_id='aws_default',
max_message=1,
dag=dag)
最佳答案
它不会在每条新消息时调用,因为计划间隔设置为@daily。因此它将在第二天运行(按照计划间隔),然后从 SQS 队列获取消息。
需要注意的是:SQSSensor 只是 SQS Que 上的传感器,基本上用于轮询 que。 DAG 的触发仍然是我们在 dag 定义中指定的时间表。
关于airflow - 当有新消息时, Airflow 中的 Sqs 传感器不会触发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63275247/