python - 您将如何参数化 Dagster 管道以运行具有多个不同配置/ Assets 的相同实体?

标签 python dagster

假设我使用以下实体创建了一个 Dagster 管道:

  • 从文件执行 SQL 查询并获得结果
  • 将结果写入表

  • 我想对 10 个不同的表并行执行此操作。每个表都需要不同的 SQL 查询。
    最好的方法是什么?

    最佳答案

    一种方法是使用实​​体工厂。 run_query_solid_factorywrite_results_solid_factory是实体工厂,它们接受输入(例如名称和查询或表)并返回可以在管道中运行的实体。 summary_report在打印摘要信息之前等待所有上游实体完成。

    def run_query_solid_factory(name, query):
        @solid(name=name)
        def _run_query(context):
            context.log.info(query)
            return 'result'
    
        return _run_query
    
    def write_results_solid_factory(name, table):
        @solid(name=name)
        def _write_results(context, query_result):
            context.log.info(table)
            context.log.info(query_result)
            return 'success'
    
        return _write_results
    
    @solid
    def summary_report(context, statuses):
        context.log.info(' '.join(statuses))
    
    @pipeline
    def pipeline():
        solid_output_handles = []
        queries = [('table', 'query'), ('table2', 'query2')]
        for table, query in queries:
            get_data = run_query_solid_factory('run_query_{}'.format(query), query)
            write_results = write_results_solid_factory('write_results_to_table_{}'.format(table), table)
            solid_output_handles.append(write_results(get_data()))
    
        summary_report(solid_output_handles)
    

    Dag structure
    Dag execution logs

    上一个答案:

    我建议创建一个 composite_solid它由一个处理 (1) 的实体和一个处理 (2) 的实体组成。然后,您可以alias对 10 个表中的每一个进行复合实体一次,这将让您通过配置传递 SQL 查询(请参阅 tutorial )

    关于python - 您将如何参数化 Dagster 管道以运行具有多个不同配置/ Assets 的相同实体?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61330816/

    相关文章:

    python - 无法将输入发送到 Sublime Text 中正在运行的程序

    Python——输出文件中的数据位于不方便的位置

    python - 使用时间触发器在 Azure 中运行 python 脚本的选项

    python - 将不同的聚合函数应用于不同的列(现在不推荐使用重命名的字典)

    python - 生成 datetime.time 对象数组 python

    dagster - 如何在 dagster Assets /作业中指定/使用幂等 "date of execution"?

    python - 向实体函数添加附加参数

    python - 教程期间使用 dagster CLI 出现 DagsterUnmetExecutorRequirementsError

    python - 使用 Dagster 进行交叉验证

    python-3.x - 函数调用中的参数没有值