<分区>
在 Airflow 中,我正在编写一个包含多个任务的 DAG,到目前为止都是 PythonOperators
。从 t1
我想在 xcom
字典中存储一个变量,然后在 t2
的函数中我想访问该变量而不显式调用任务名称(这需要在 t2 函数中对任务名称进行硬编码)。所以我的计划是访问 context['ti']
并使用具有属性 task_id
的 _get_previous_ti()
。这看起来像我想要的,但它绝对不适合我。
我试过这个:
from airflow.models import DAG, Variable
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
def task1(**context):
return 'TASK 1 RESULT'
def task2(**context):
previous_ti = context['ti']._get_previous_ti()
print("Previous TI: ", previous_ti)
previous_ti_id = previous_ti.task_id
print("Previous task_id: ", previous_ti_id)
# use previous_ti_id to access context['ti'].xcom.pull(previous_ti_id)
return
default_args = {
"owner": "me",
"start_date": days_ago(1)
}
dag = DAG(
dag_id='some_test',
default_args=default_args,
schedule_interval=None)
with dag:
t1 = PythonOperator(
task_id = "task_1_testing",
python_callable=task1,
provide_context=True)
t2 = PythonOperator(
task_id = "task_2_testing",
python_callable=task2,
provide_context=True)
t1 >> t2
但这会产生奇怪的结果:当我第一次测试它时,airflow 已经在运行并且它似乎正在引用先前触发的 dag 运行(?)的任务实例。当我退出 Airflow 并使用这段代码重新启动它时,它给了我错误的爆炸屏幕:AttributeError: 'NoneType' object has no attribute 'dag_id'
。
我真正想要的是在 t2
函数中创建一个名为 previous_ti_id
的变量,在本例中它会返回等于 task_1_testing
.可能吗?
我找到了 this previous question ,但我对 Airflow 的了解还不够,无法确定这是否相关(虽然看起来并不相关)。我将不胜感激。