python - 在 Airflow 中的后续任务中获取先前任务 ID 的名称?

标签 python airflow

<分区>

在 Airflow 中,我正在编写一个包含多个任务的 DAG,到目前为止都是 PythonOperators。从 t1 我想在 xcom 字典中存储一个变量,然后在 t2 的函数中我想访问该变量而不显式调用任务名称(这需要在 t2 函数中对任务名称进行硬编码)。所以我的计划是访问 context['ti'] 并使用具有属性 task_id_get_previous_ti()。这看起来像我想要的,但它绝对不适合我。

我试过这个:

from airflow.models import DAG, Variable
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator

def task1(**context):
    return 'TASK 1 RESULT'

def task2(**context):
    previous_ti = context['ti']._get_previous_ti()
    print("Previous TI: ", previous_ti)
    previous_ti_id = previous_ti.task_id
    print("Previous task_id: ", previous_ti_id)

    # use previous_ti_id to access context['ti'].xcom.pull(previous_ti_id)
    return

default_args = {
    "owner": "me",
    "start_date": days_ago(1)
}

dag = DAG(
    dag_id='some_test',
    default_args=default_args,
    schedule_interval=None)


with dag:

    t1 = PythonOperator(
        task_id = "task_1_testing",
        python_callable=task1,
        provide_context=True)

    t2 = PythonOperator(
        task_id = "task_2_testing",
        python_callable=task2,
        provide_context=True)
    
    t1 >> t2

但这会产生奇怪的结果:当我第一次测试它时,airflow 已经在运行并且它似乎正在引用先前触发的 dag 运行(?)的任务实例。当我退出 Airflow 并使用这段代码重新启动它时,它给了我错误的爆炸屏幕:AttributeError: 'NoneType' object has no attribute 'dag_id'

我真正想要的是在 t2 函数中创建一个名为 previous_ti_id 的变量,在本例中它会返回等于 task_1_testing .可能吗?

我找到了 this previous question ,但我对 Airflow 的了解还不够,无法确定这是否相关(虽然看起来并不相关)。我将不胜感激。

最佳答案

函数 _get_previous_ti() 返回上一个任务实例,这是同一个任务,但来自上一个任务运行。您正在寻找上游任务 ID,应该可以通过 upstream_list 获得这些 ID或 upstream_list_task_ids

也许还有this post帮助你。

关于python - 在 Airflow 中的后续任务中获取先前任务 ID 的名称?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64045530/

相关文章:

python - Python 装饰器做什么,它的代码在哪里?

python - 如何编写 Keras 自定义指标来过滤或屏蔽某些值?

kubernetes - Kubernetes 上的 Airflow : Errno 13 - Permission denied: '/opt/airflow/logs/scheduler

redis - 带有redis的 Airflow celery -6小时后超时

Python OpenCV cv.WaitKey 在 Ubuntu 模 256 map 上正确吐出奇怪的输出

python - 从最后可用数据创建 DataFrame 的最快方法

python - Airflow ,在创建时启用 dag

Airflow/Composer 推荐的文件夹结构

python - 如何在 Python 中解析 SRV 记录?

docker-compose env 文件在 yml 中工作但不使用命令行参数