dagster - 如何避免在特定条件下运行 dagster 管道的其余部分

标签 dagster

假设我在 Dagster 中有两个实体连接在一条管道上。第一个实体可能会执行一些处理并生成有效输入,以便管道的其余部分执行,或者生成不应进一步处理的无效输入。为实现此结果,我在数据满足无效条件时引发错误,因此管道停止并跳过其余实体。

提出错误来解决我的用例似乎很老套,有没有一种方法可以在不诉诸异常的情况下跳过管道其余部分的执行?

from dagster import solid, pipeline

@solid
def solid_1(context, x: int):
    y = x + 1

    if y%2 == 0:
        raise "No even number is further processed"

    return y

@solid
def solid_2(context, y:int):
    return y**2

@pipeline
def toy_pipeline():
    solid_2(solid_1())

在这个非常人为的例子中,实体 2 应该只在第一个实体的输出为奇数时执行。

在我的实际用例中,第一个 solid 轮询数据库,有时找不到要处理的数据。在这种情况下,不将执行标记为失败而是标记为成功是有意义的。可以检查每个下游实体中的数据是否满足条件,但这会很快增加样板文件。当接收数据的实体发现没有数据要处理时,最好有一种方法跳过所有下游实体的执行。

最佳答案

要实现您想要的行为,可以使用相应 OutputDefinition 上的 is_required=False 参数将输出标记为可选。这意味着输出不一定必须由固体产生。

如果未产生可选输出,则依赖于输出的所有下游实体将简单地跳过。这对于短路管道(您的用例)或更复杂的分支逻辑都很有用。跳过实体时,管道运行不会标记为失败。

您使用类型提示来定义输入和输出类型,但由于您需要指定 is_required 参数,因此您需要使用显式 OuputDefinition

from dagster import pipeline, solid, RepositoryDefinition, InputDefinition, OutputDefinition, Output
from typing import List

def query_db():
    return []

@solid(output_defs=[OutputDefinition(List[int], 'data', is_required=False)])
def solid_1(context):
    rows = query_db()

    if len(rows) > 0:
        yield Output(rows, output_name="data")


@solid
def solid_2(context, data: List[int]):
    context.log.info(str(data))
    pass


@pipeline
def my_pipeline():
    solid_2(solid_1())

实体 solid_2 也可以使用 InputDefinition 而不是类型提示来定义。类型提示是 InputDefinitions 的语法糖:

@solid(input_defs=[InputDefinition('data', List[int])])
def solid_2(context, data):
    context.log.info(str(data))
    # Process data
    pass

作为旁注:一般来说,异常是将实体标记为失败的正确方法,并且在 Dagster 代码中不被视为 hacky。

关于dagster - 如何避免在特定条件下运行 dagster 管道的其余部分,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62025039/

相关文章:

python - 使用 Dagster 进行交叉验证

python - 将 Dagster 与 Django 集成

Dagster 链接资源

python - 如何在复合实体中使用从其他实体产生的字典?

dagster - 如何指定在何处实现 dagster Assets ?

dagster - 如何在 dagster Assets /作业中指定/使用幂等 "date of execution"?

python - 向实体函数添加附加参数