我有一个名为dss_controller的 Controller dag
dag = DAG(
dag_id='dss_controller',
default_args={
"owner": "dss admin",
"start_date": datetime.utcnow(),
},
schedule_interval=None,
)
和一个名为dss_trigger_target_dag的目标 dag
dag = DAG(
dag_id='dss_trigger_target_dag',
default_args=args,
schedule_interval=None,
)
任务在 Controller 和目标 dag 中定义,如默认可用示例中所示。
当dss_controller的schedule_interval设置为“@once”时,该系统按预期工作。
然后我将其设置为无并从外部触发。它将触发 Controller dag 并将其移至运行状态,然后将其移至成功状态。
但它不会触发 Controller dag 的 dss_trigger_dagrun 任务。 这种行为的原因是什么?
设置schedule_interval=没有原因,如果是的话怎么办?
这是我的 Controller ,
import pprint
import pprint
from datetime import datetime
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
pp = pprint.PrettyPrinter(indent=4)
def conditionally_trigger(context, dag_run_obj):
"""This function decides whether or not to Trigger the remote DAG"""
c_p = context['params']['condition_param']
print("Controller DAG : conditionally_trigger = {}".format(c_p))
if context['params']['condition_param']:
dag_run_obj.payload = {'message': context['params']['message']}
pp.pprint(dag_run_obj.payload)
return dag_run_obj
# Define the DAG
dag = DAG(
dag_id='dss_controller',
default_args={
"owner": "dss admin",
"start_date": datetime.utcnow(),
},
schedule_interval=None,
)
# Define the single task in this controller example DAG
trigger = TriggerDagRunOperator(
task_id='dss_trigger_dagrun',
trigger_dag_id="dss_trigger_target_dag",
python_callable=conditionally_trigger,
params={'condition_param': True, 'message': 'Hello Hasitha'},
dag=dag,
)
这是我的目标,
import pprint
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
pp = pprint.PrettyPrinter(indent=4)
args = {
'start_date': datetime.utcnow(),
'owner': 'dss admin',
}
dag = DAG(
dag_id='dss_trigger_target_dag',
default_args=args,
schedule_interval=None,
)
def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".
format(kwargs['dag_run'].conf['message']))
run_this = PythonOperator(
task_id='target_run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag,
)
最佳答案
您已在默认参数中添加了“start_date”作为 now(),该参数适用于每个任务。看来,这才是真正的罪魁祸首。 Airflow 建议不要使用它,因为它可以防止触发任务。 尝试将开始日期设置为过去的日期,例如airflow.utils.dates.days_ago(1)。
引用:https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
关于python-3.x - 外部触发 dag,其 Schedule_interval=None,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57160334/