python - 自动注册新的级长流程?

标签 python etl prefect

如果本地代理正在运行,是否有一种机制可以自动注册流/新流,而无需手动运行,例如flow.register(...) 每个?
在气流中,我相信他们有一个过程,可以定期扫描指定气流主文件夹中名称中带有 dag 的任何文件,然后在它们中搜索 DAG 对象。如果它找到它们,它就会加载它们,这样它们就可以通过 UI 访问,而无需手动“注册”它们。
知府是否存在类似的东西。因此,例如,如果我刚刚创建了以下文件 test_flow.py,而不必运行它或添加 flow.run_agent() 有没有办法让它神奇地注册并通过 UI 访问 :) - 仅仅通过它存在于适当的位置?

# prefect_home_folder/test_flow.py
import prefect
from prefect import task, Flow

@task
def hello_task():
    logger = prefect.context.get("logger")
    logger.info("Hello, Cloud!")

flow = Flow("hello-flow", tasks=[hello_task])

flow.register(project_name='main')
我可以编写一个与气流过程具有相似行为的脚本,以定期扫描文件夹并注册流量,但我想知道它是否有点笨拙,或者是否有更好的解决方案,我只是在考虑太多空气流动?

最佳答案

好问题(和很棒的用户名!) - 简而言之,我建议您在 Airflow 方面考虑太多。有几个原因目前在 Prefect 中不可用:

  • 显式优于隐式
  • Prefect 流不限于居住在一个地方,也不限于具有相同的运行时环境;这使得自动发现流 + 从单个代理进程重新序列化它变得复杂(不需要与它提交的流共享相同的运行时环境)
  • 代理最好被认为是由部署基础设施参数化的,而不是流存储

  • 理想情况下,对于生产工作流,您将使用 CI/CD 流程,以便在您更改代码时触发自 Action 业,重新注册流程。一些可能有用的评论:
  • 您实际上不需要为每个可能的代码更改重新注册流程;例如,如果您更改了 hello_task 在您的示例中记录的消息,您可以简单地将流重新保存到其原始位置(这取决于您使用的存储类型)。最终,您只需要在有关您的流程的任何元数据发生更改(重试设置、任务名称、依赖关系等)时重新注册
  • 你可以使用 flow.register("My Project", idempotency_key=flow.serialized_hash()) 自动捕获这个;如果流的后端表示以某种方式发生变化,则此模式只会注册新版本
  • 关于python - 自动注册新的级长流程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64973695/

    相关文章:

    python - 在 python 中打印长整数

    python - 在 Python Pandas 中查找前导零的记录

    sql - 在 SSIS csv 导出中去除日期/时间分隔符

    postgresql - 将数据从 SugarCRM 迁移到 Odoo?

    amazon-s3 - Redshift 中的维度建模和 ETL

    python - 是否可以循环 prefect.Parameter?

    python - 完善如何避免重新运行任务

    python - 将许多小 DataFrame 导出到单个 Excel 工作表

    python - 如何显示按外键分组的对象