python - 如何以不同的时间计划间隔运行子标签?

标签 python airflow directed-acyclic-graphs

我有 Airflow 的 DAG,其中包含以下任务:

  1. 将 csv 添加到暂存表 (t1)
  2. 从主表 (t2) 中删除旧记录
  3. 将最新数据添加到表 (t3)

以及一个每天结束时运行的子标签(晚上 11.59 或 24 小时制 23.59)。前三个任务必须首先运行,然后子任务才会运行

t1 >> t2 >> t3 >> subdag

问题是,前 3 个任务运行良好,但 subdag 却不行。我先重新启动而不是给出错误标志。我也无法检查错误的位置和原因。

我尝试重新定义子dag中的schedule_interval以遵循预期结果,从12 * * * *59 12 * * *。我也尝试从这篇博客文章https://medium.com/handy-tech/airflow-tips-tricks-and-pitfalls-9ba53fba14eb :

这是 dag default_dag_args 代码:

DAG_NAME = 'order_bid'

...
default_dag_args = {
    'start_date': start_date,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'project_id': models.Variable.get('XXXXXXXXXXX')
}

这是任务声明的示例:

task_add_order_bid = bigquery_operator.BigQueryOperator(
    task_id='add_order_bid',
    bql=order_bid.sql_itop_order_bid.format(
        table_order_bid_stg=table_order_bid_stg,
        date_from=date_from.strftime("%Y-%m-%d")
        ),
    use_legacy_sql=False,
    write_disposition='WRITE_APPEND',
    destination_dataset_table=table_order_bid,
    dag=dag,

)

这是我尝试声明的子标签:

subdag_daily_bid = SubDagOperator(
    subdag=daily.dailyBidding(
        DAG_NAME,
        "daily_order_bid",
        start_date,
        dt_wib),
    task_id="daily_order_bid",
dag = dag)

但是当我想更改时间表时,我会像这样声明我的子dag:

def dailyBidding(parent_dag, child_dag, start_date, task_date):
    dag = models.DAG(
        '%s.%s' % (parent_dag, child_dag),
        schedule_interval='59 12 * * *',
        start_date=start_date
        )

    date_from = task_date - timedelta(days=1)

    task_del_taxi_order_bid_daily = bigquery_operator.BigQueryOperator(
    task_id='del_daily_order_bid',
    bql=sql_del_partition_order_bid_daily.format(
        table_order_bid=table_order_bid_master,
        date_from=date_from.strftime("%Y-%m-%d")),
    use_legacy_sql=False,
    dag=subdag)

task_add_daily_order_bid = bigquery_operator.BigQueryOperator(
    task_id='add_daily_order_bid',
    bql=daily.sql_add_daily.format(
        source = table_order_bid_master,
        yesterday = date_from.strftime("%Y-%m-%d"),
        monthly = a_month_ago.strftime("%Y-%m-%d")),
    use_legacy_sql=False,
    write_disposition='WRITE_APPEND',
    create_disposition='CREATE_IF_NEEDED',
    destination_dataset_table=table_daily_order_bid,
    dag=subdag)


task_del_taxi_order_bid_daily >> task_add_daily_order_bid

    return dag

我预计我的 dag 在 12.59 工作,但它仍在等待运行并遵循 parent 的时间表。

最佳答案

据天文学家.io post ,子DAG 必须与其父DAG 具有相同的计划。

一般来说,如果您需要让 DAG 的一部分按照与其他部分不同的时间表运行,有几种方法可以实现这一点:

  • 您可以使用short circuit operator仅当日期戳(例如 ds)与您想要的频率匹配时才继续使用 subdag。因此您的 DAG 可以每天运行,但只在星期一执行 subdag 步骤
  • 您可以将 subdag 分解为一个完全不同的 DAG,将该 DAG 设置为每周计划并使用 sensor检查并等待数据,或者只是普通的 PythonOperator 检查并在数据不存在时失败。

我发现第二种方法。如果所需的数据不存在,则失败消息对我来说是一个有用的信号,表明存在更大的错误。

关于python - 如何以不同的时间计划间隔运行子标签?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56948555/

相关文章:

python - 还有其他方法可以连接到 Airflow 中的 Google 表格吗?

python - 如何通过单个脚本生成多个 Airflow dags?

algorithm - 棋盘上国王的最短路径

python - Google Foobar 挑战 Power Hungry - 未通过测试。 5 个测试用例中的 3 个 [隐藏]?

python - pandas DataFrame 仅针对一列进行转置/熔化/旋转

python - App Engine 单元测试 : ImportError: Start directory is not importable

python - Airflow:如何从 Postgres Operator 插入 xcom 值(value)?

python - REST api 中的普通(非 HTML)错误页面

postgresql - Airflow psycopg2.OperationalError : FATAL: sorry, 已经有太多客户端

java - 如何在 Airflow 中运行 Spark 代码?