python-3.x - 外部触发 d​​ag,其 Schedule_interval=None

标签 python-3.x airflow airflow-scheduler

我有一个名为dss_controller的 Controller dag

dag = DAG(
dag_id='dss_controller',
default_args={
    "owner": "dss admin",
    "start_date": datetime.utcnow(),
},
schedule_interval=None,
)

和一个名为dss_trigger_target_dag的目标 dag

dag = DAG(
dag_id='dss_trigger_target_dag',
default_args=args,
schedule_interval=None,
)

任务在 Controller 和目标 dag 中定义,如默认可用示例中所示。

dss_controllerschedule_interval设置为“@once”时,该系统按预期工作。

然后我将其设置为并从外部触发。它将触发 Controller dag 并将其移至运行状态,然后将其移至成功状态。

但它不会触发 Controller dag 的 dss_trigger_dagrun 任务。 这种行为的原因是什么?

设置schedule_interval=没有原因,如果是的话怎么办?

这是我的 Controller ,

import pprint
import pprint
from datetime import datetime

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator

pp = pprint.PrettyPrinter(indent=4)


def conditionally_trigger(context, dag_run_obj):
   """This function decides whether or not to Trigger the remote DAG"""
   c_p = context['params']['condition_param']
   print("Controller DAG : conditionally_trigger = {}".format(c_p))
   if context['params']['condition_param']:
      dag_run_obj.payload = {'message': context['params']['message']}
      pp.pprint(dag_run_obj.payload)
      return dag_run_obj

   # Define the DAG
   dag = DAG(
      dag_id='dss_controller',
    default_args={
        "owner": "dss admin",
        "start_date": datetime.utcnow(),
    },
    schedule_interval=None,
)

# Define the single task in this controller example DAG
trigger = TriggerDagRunOperator(
    task_id='dss_trigger_dagrun',
    trigger_dag_id="dss_trigger_target_dag",
    python_callable=conditionally_trigger,
    params={'condition_param': True, 'message': 'Hello Hasitha'},
    dag=dag,
)

这是我的目标,

import pprint
from datetime import datetime

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

pp = pprint.PrettyPrinter(indent=4)

args = {
    'start_date': datetime.utcnow(),
    'owner': 'dss admin',
}

dag = DAG(
    dag_id='dss_trigger_target_dag',
    default_args=args,
    schedule_interval=None,
)


def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".
          format(kwargs['dag_run'].conf['message']))


run_this = PythonOperator(
    task_id='target_run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag,
)

最佳答案

您已在默认参数中添加了“start_date”作为 now(),该参数适用于每个任务。看来,这才是真正的罪魁祸首。 Airflow 建议不要使用它,因为它可以防止触发任务。 尝试将开始日期设置为过去的日期,例如airflow.utils.dates.days_ago(1)。


引用:https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date

关于python-3.x - 外部触发 d​​ag,其 Schedule_interval=None,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57160334/

相关文章:

python - 如何在Windows 10 中备份anaconda 环境?

python - 如何在 Airflow 中使用 PythonVirtualenvOperator?

python - Airflow 工作器 - 连接中断 : IncompleteRead(0 bytes read)

rest - Apache Airflow - 休息 API 身份验证

python - 原子Python : Functions not recognised

python - 使用 Cnn 和 Lstm 提取图像字幕生成器的特征?

python - 投注算法,特别是赢得赌注的算法?

airflow - 如何限制 Airflow 一次仅运行一个 DAG 实例?

google-cloud-platform - 更改 Cloud Composer 的 DAG 的默认 GCS 位置

postgresql - 无法运行 Airflow 调度程序