我想从动态任务输出生成动态任务。每个映射任务都会返回一个列表,我想为列表中的每个元素创建一个单独的映射任务,因此该过程将如下所示: 是否可以扩展动态映射任务的输出,从而产生一系列映射操作而不是映射,然后进行归约?
我尝试过的:
在我的本地环境中,我正在使用:
Astronomer Runtime 9.6.0 based on Airflow 2.7.3+astro.2
Git Version: .release:9fad9363bb0e7520a991b5efe2c192bb3405b675
为了进行实验,我使用了三个任务,其中单个字符串作为输入,字符串列表作为输出。
1。展开具有扩展任务的组(映射具有映射任务的组):
import datetime
import logging
from airflow.decorators import dag, task, task_group
@dag(schedule_interval=None, start_date=datetime.datetime(2023, 9, 27))
def try_dag3():
@task
def first() -> list[str]:
return ["0", "1"]
first_task = first()
@task_group
def my_group(input: str) -> list[str]:
@task
def second(input: str) -> list[str]:
logging.info(f"input: {input}")
result = []
for i in range(3):
result.append(f"{input}_{i}")
# ['0_0', '0_1', '0_2']
# ['1_0', '1_1', '1_2']
return result
second_task = second.expand(input=first_task)
@task
def third(input: str, input1: str = None):
logging.info(f"input: {input}, input1: {input1}")
return input
third_task = third.expand(input=second_task)
my_group.expand(input=first_task)
try_dag3()
但它会导致NotImplementedError:尚不支持扩展任务组中的运算符扩展
2。展开扩展的任务结果(映射到映射的任务):
import datetime
import logging
from airflow.decorators import dag, task
@dag(start_date=datetime.datetime(2023, 9, 27))
def try_dag1():
@task
def first() -> list[str]:
return ["0", "1"]
first_task = first()
@task
def second(input: str) -> list[str]:
logging.info(f"source: {input}")
result = []
for i in range(3):
result.append(f"{input}_{i}")
# ['0_0', '0_1', '0_2']
# ['1_0', '1_1', '1_2']
return result
# this expands fine into two tasks from the list returned by first_task
second_task = second.expand(input=first_task)
@task
def third(input: str):
logging.info(f"source: {input}")
return input
# this doesn't expand - there are two mapped tasks, and the input value is a list, not a string
third_task = third.expand(input=second_task)
try_dag1()
但是 second
dag 的结果没有展开,而 third
任务输入是一个字符串列表:
third[0]
任务日志:
[2024-01-05, 11:40:30 UTC] {try_dag1.py:30} 信息 - 来源:['0_0', '0_1', '0_2']
3。使用 const 输入扩展扩展任务(以测试结构是否可行):
import datetime
import logging
from airflow.decorators import dag, task
@dag(start_date=datetime.datetime(2023, 9, 27))
def try_dag0():
@task
def first() -> list[str]:
return ["0", "1"]
first_task = first()
@task
def second(input: str) -> list[str]:
logging.info(f"input: {input}")
result = []
for i in range(3):
result.append(f"{input}_{i}")
# ['0_0', '0_1', '0_2']
# ['1_0', '1_1', '1_2']
return result
second_task = second.expand(input=first_task)
@task
def third(input: str, input1: str = None):
logging.info(f"input: {input}, input1: {input1}")
return input
third_task = third.expand(input=second_task, input1=["a", "b", "c"])
try_dag0()
看起来映射的任务可以通过传递给 input1
的常量列表进行扩展,但 input
值是一个非扩展列表:
third[0]
任务日志:
[2024-01-05, 12:51:39 UTC] {try_dag0.py:33} 信息 - 输入:['0_0', '0_1', '0_2'], input1: a
最佳答案
您需要添加一个任务来收集并展平第二个
的结果。
@task
def first() -> list[str]:
return ['1', '2']
@task
def second(input: str) -> list[str]:
return [f"{input}_{i}" for i in ['1', '2', '3']]
@task
def second_collect(input: list[list[str]]) -> list[str]:
return list(chain.from_iterable(input))
@task
def third(input: str) -> str:
return f"Result: {input}!"
sc = second_collect(second.expand(input=first()))
third.expand(input=sc)
second_collect
的结果为 ['1_1', '1_2', '1_3', '2_1', '2_2', '2_3']
(映射的任务)。
第三个
映射任务的结果是:
结果:1_1!
结果:1_2!
- ...
关于python - 是否可以构建扩展 Airflow DAG 任务的树形结构? (动态任务映射输出上的动态任务映射),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/77766938/