airflow - 当有新消息时, Airflow 中的 Sqs 传感器不会触发

标签 airflow amazon-sqs job-scheduling

我是 Airflow 新手。每当 SQS 中有消息时,我都会尝试运行我的 dag。 我正在使用 SQSSensor 来做同样的事情。它在第一次运行时触发,但之后当有新消息时它不会调用。

如果我遗漏了什么,请告诉我。

default_args = {
    'owner': 'Airflow',
    'start_date': days_ago(2),
    'provide_context': True,
}

dag = DAG('sqs_test', default_args=default_args, schedule_interval='@daily')

task = SQSSensor(
    task_id='sqs_test',
    poke_interval=0,
    timeout=10,
    sqs_queue='https://sqs.us-east-1.amazonaws.com/accountid/test',
    aws_conn_id='aws_default',
    max_message=1,
    dag=dag)

最佳答案

它不会在每条新消息时调用,因为计划间隔设置为@daily。因此它将在第二天运行(按照计划间隔),然后从 SQS 队列获取消息。

需要注意的是:SQSSensor 只是 SQS Que 上的传感器,基本上用于轮询 que。 DAG 的触发仍然是我们在 dag 定义中指定的时间表。

关于airflow - 当有新消息时, Airflow 中的 Sqs 传感器不会触发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63275247/

相关文章:

airflow - DAG 中的新任务会阻止进一步的 DAG 执行

Airflow - 实验性 API 为某些端点返回 405

json - 将变量传递给 aws cli 内联 json

ruby - sleep() 对于作业调度应用程序的主循环来说是个好主意吗

airflow - 如何检查 Airflow 测试的输出?

python-2.7 - Airflow 无法初始化数据库

python - AWS SQS ReceiveMessage 收到的消息少于请求的消息?

java - SQSListener 不消耗队列中的消息

java - 使用 dropwizard-sundial 安排工作

multithreading - 将作业动态分配给一组共享资源上的处理器