airflow - 如何在 Airflow 中格式化宏?

标签 airflow

我有以下内容:

EXEC_DATE1 = '{{ macros.ds_add(ds, 1) }}'


EXEC_DATE2 = '{{ execution_date }}'

我想创建如下所示的路径变量:

path1 = EXEC_DATE1 + '/' + HH:MM (of EXEC_DATE1)
path2 = EXEC_DATE2 + '/' + HH:MM (of EXEC_DATE2)

最终应该是这样的:

2018-09-16/10:41

我该怎么做?

我试过:

EXEC_DATE = '{{ execution_date }}'
EXEC_DATE = EXEC_DATE.strftime('%Y-%m-%d/%H:%M')

但它给出:

'str' object has no attribute 'strftime'

编辑: 我的代码:

EXEC_TIMESTAMP_PATH = "{{  execution_date.strftime('%Y-%m-%d/%H:%M') }}"
EXEC_DATE = "{{  execution_date.strftime('%H:%M') }}"
EXEC_TIME = "{{  mexecution_date.strftime('%Y-%m-%d') }}"

task3_op= BashOperator(
    task_id='task3',
    params={'EXEC_DATE':EXEC_DATE, 'EXEC_TIME':EXEC_TIME},
    bash_command="""python3 script.py '{{ var.value.task3_variable }}' '{{ params.EXEC_DATE }}' '{{ params.EXEC_TIME }}' 'file.json'""",
    dag=dag)

这行不通。不渲染参数。

最佳答案

您需要按如下方式进行:

EXEC_DATE = "{{ execution_date.strftime('%Y-%m-%d/%H:%M') }}"

strftime 应该在大括号内使用。

如果您需要下一个执行日期,请使用以下内容:

EXEC_DATE = "{{ next_execution_date.strftime('%Y-%m-%d/%H:%M') }}"

如果你只想添加一个timedelta:

EXEC_DATE = "{{ (execution_date + macros.timedelta(days=1)).strftime('%Y-%m-%d/%H:%M') }}"

你的代码可以如下:

BASH_COMMAND="""
python3 script.py {{ var.value.task3_variable }} {{  execution_date.strftime('%H:%M') }} {{  execution_date.strftime('%Y-%m-%d') }} file.json
"""

task3_op= BashOperator(
    task_id='task3',
    bash_command=BASH_COMMAND,
    dag=dag)

关于airflow - 如何在 Airflow 中格式化宏?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52352889/

相关文章:

python - 如何隐藏/屏蔽 Airflow 连接和可变截面的敏感数据?

python - 如何在 Airflow 2.x 中将 XComArg 转换为字符串值?

mysql - Airflow:Celery worker MySQL 连接过多

即使端点进程完成后,Airflow 任务仍在运行

airflow - 从应在 Airflow 中按顺序运行的函数返回任务列表

sqlalchemy - 从 Airflow 连接 postgres 数据库时出错

airflow - 删除调度程序文件夹中的 Airflow 日志

python - 如何从 Apache Airflow 使用 DockerOperator

pycharm - Windows PyCharm 中的 Airflow DAG 开发

python - systemd : `airflow.pid` vs `airflow-monitor.pid` 的 Airflow