python - Airflow - 在 DockerOperator 中使用 run_id 作为卷名称

标签 python docker airflow mount

我在 Apache Airflow 中编写了一个 DockerOperator,我想给它一个卷。到目前为止,一切都很好。这是一个例子:

t = DockerOperator(
        task_id='test',
        image='testimage:latest',
        command='python3 /code/test.py',
        volumes=["/mnt/interim:/interim"],
        xcom_push=True,
        dag=dag,
)

我遇到的问题如下:

挂载目录的名称需要灵活。因此,我想挂载一个名称中带有run_id的目录。

volumes=["/mnt/interim/" + "{{ run_id }}" + ":/interim"]

然而,Airflow 似乎无法解析卷中的“{{ run_id }}”,而只能解析 DockerOperator 的命令。

简而言之,我想获取 run_id 以便安装它。

请注意,使用 Airflow 变量( Airflow 的环境变量)不会解决问题,因为如果任务并行运行,该变量可能会被覆盖。

也许你们中的某个人已经知道可以做到这一点的高级 DockerOperator (CustomOperator)。

提前致谢:)

最佳答案

感谢 Johannes 提出问题。

您尝试实现的目标是可能的,但由于这不是一个非常常见的用例,因此默认情况下不会启用。 template_fields 可迭代中的参数由 Airflow 进行模板化。 volumes 字段不在那里,因此不会被拾取。

最简单的方法是复制项目中的 docker_operator.py,并将 volumes 字段添加到列表中:https://github.com/apache/airflow/blob/master/airflow/operators/docker_operator.py#L126 :

template_fields = ('command', 'environment', 'container_name', 'volumes',)

您还可以打开一个票证并将其合并到上游,但我不确定有多少用户会模板化此字段。希望这会有所帮助。

关于python - Airflow - 在 DockerOperator 中使用 run_id 作为卷名称,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59559743/

相关文章:

node.js - Docker buildx 与 Apple M1 芯片上的 Node 应用程序 - standard_init_linux.go :211: exec user process caused "exec format error

airflow - 回填/清除旧 DAG 时 dagrun_timeout 是否会干扰?

Python:取两个数据框的最大值来创建第三个数据框

python - 在 VBS 中使用 WshShell.Run 运行 Python 脚本不会生成输出文件

bash - 如何确定在 shell 启动期间打印的错误消息来自何处?

python - Apache Airflow 持续集成工作流和依赖管理

airflow - 使用Airflow进行批处理,根据父任务的输出动态启动多个任务

python - PyQt:计时器无法从另一个线程启动

python - str.title() 和 Mac 上的编码

docker - Bluemix中缺少定制的docker镜像