python - 是否可以构建扩展 Airflow DAG 任务的树形结构? (动态任务映射输出上的动态任务映射)

标签 python airflow airflow-2.x

我想从动态任务输出生成动态任务。每个映射任务都会返回一个列表,我想为列表中的每个元素创建一个单独的映射任务,因此该过程将如下所示: Airflow dynamic task tree 是否可以扩展动态映射任务的输出,从而产生一系列映射操作而不是映射,然后进行归约?

我尝试过的:

在我的本地环境中,我正在使用:

Astronomer Runtime 9.6.0 based on Airflow 2.7.3+astro.2
Git Version: .release:9fad9363bb0e7520a991b5efe2c192bb3405b675

为了进行实验,我使用了三个任务,其中单个字符串作为输入,字符串列表作为输出。

1。展开具有扩展任务的组(映射具有映射任务的组):

import datetime
import logging

from airflow.decorators import dag, task, task_group

@dag(schedule_interval=None, start_date=datetime.datetime(2023, 9, 27))
def try_dag3():

    @task
    def first() -> list[str]:
        return ["0", "1"]

    first_task = first()

    @task_group
    def my_group(input: str) -> list[str]:
    
        @task
        def second(input: str) -> list[str]:
            logging.info(f"input: {input}")
            result = []
            for i in range(3):
                result.append(f"{input}_{i}")

            # ['0_0', '0_1', '0_2']
            # ['1_0', '1_1', '1_2']
            return result

        second_task = second.expand(input=first_task)

        @task
        def third(input: str, input1: str = None):
            logging.info(f"input: {input}, input1: {input1}")
            return input

        third_task = third.expand(input=second_task)
        
    my_group.expand(input=first_task)

try_dag3()

但它会导致NotImplementedError:尚不支持扩展任务组中的运算符扩展

2。展开扩展的任务结果(映射到映射的任务):

import datetime
import logging

from airflow.decorators import dag, task

@dag(start_date=datetime.datetime(2023, 9, 27))
def try_dag1():

    @task
    def first() -> list[str]:
        return ["0", "1"]

    first_task = first()

    @task
    def second(input: str) -> list[str]:
        logging.info(f"source: {input}")
        result = []
        for i in range(3):
            result.append(f"{input}_{i}")

        # ['0_0', '0_1', '0_2']
        # ['1_0', '1_1', '1_2']
        return result

    # this expands fine into two tasks from the list returned by first_task
    second_task = second.expand(input=first_task)

    @task
    def third(input: str):
        logging.info(f"source: {input}")
        return input

    # this doesn't expand - there are two mapped tasks, and the input value is a list, not a string
    third_task = third.expand(input=second_task)


try_dag1()

但是 second dag 的结果没有展开,而 third 任务输入是一个字符串列表: dag1 graph third[0]任务日志: [2024-01-05, 11:40:30 UTC] {try_dag1.py:30} 信息 - 来源:['0_0', '0_1', '0_2']

3。使用 const 输入扩展扩展任务(以测试结构是否可行):

import datetime
import logging

from airflow.decorators import dag, task

@dag(start_date=datetime.datetime(2023, 9, 27))
def try_dag0():

    @task
    def first() -> list[str]:
        return ["0", "1"]

    first_task = first()

    @task
    def second(input: str) -> list[str]:
        logging.info(f"input: {input}")
        result = []
        for i in range(3):
            result.append(f"{input}_{i}")

        # ['0_0', '0_1', '0_2']
        # ['1_0', '1_1', '1_2']
        return result

    second_task = second.expand(input=first_task)

    @task
    def third(input: str, input1: str = None):
        logging.info(f"input: {input}, input1: {input1}")
        return input

    third_task = third.expand(input=second_task, input1=["a", "b", "c"])


try_dag0()

看起来映射的任务可以通过传递给 input1 的常量列表进行扩展,但 input 值是一个非扩展列表: dag0 graph third[0]任务日志: [2024-01-05, 12:51:39 UTC] {try_dag0.py:33} 信息 - 输入:['0_0', '0_1', '0_2'], input1: a

最佳答案

您需要添加一个任务来收集并展平第二个的结果。

@task
def first() -> list[str]:
    return ['1', '2']

@task
def second(input: str) -> list[str]:
    return [f"{input}_{i}" for i in ['1', '2', '3']]

@task
def second_collect(input: list[list[str]]) -> list[str]:
    return list(chain.from_iterable(input))

@task
def third(input: str) -> str:
    return f"Result: {input}!"

sc = second_collect(second.expand(input=first()))
third.expand(input=sc)

enter image description here

second_collect 的结果为 ['1_1', '1_2', '1_3', '2_1', '2_2', '2_3'] (映射的任务)。

第三个映射任务的结果是:

  • 结果:1​​_1!
  • 结果:1​​_2!
  • ...

关于python - 是否可以构建扩展 Airflow DAG 任务的树形结构? (动态任务映射输出上的动态任务映射),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/77766938/

相关文章:

python - 获取分组中具有最大值的行

python 3.6 selenium webdriver 错误 X display is required for sending-keys unable to ues Xvfb

airflow - Google Cloud Composer 服务器遇到临时错误,无法完成您的请求

airflow - 读取 XCOM 和 Airflow 变量可能会减慢 Airflow(在 Google Cloud Composer 中)

python-3.x - Dag 可以读取 CSV 行作为运算符(operator)的输入

python - 如何在 python 中将宇宙数据的时间戳转换为日期时间格式?

一定范围内整数的 Python numpy 乘积

docker - dockerized算法的机器学习DAG,Luigi/Airflow/Celery

python - 升级 AWS Apache Airflow (MWAA) 中的 python 版本

python - 如何删除 Airflow 中的下游或上游任务依赖性