python - 无法在带有 Airflow 的 Jinja 模板中使用 Python 变量

标签 python amazon-web-services airflow amazon-emr mwaa

我正在尝试使用 Airflow 在 AWS EMR 上运行 11 步并遵循此 code作为引用。因为对 11 个步骤使用 EmrAddStepsOperator 和 EmrStepSensor 会重复太多。所以我试图遍历它。我在我的 DAG 中使用了以下代码。

step_adder = list()
step_checker = list()
steps = ['step1', 'step2', 'step3', 'step4', 'step5', 'step6'...till step11]

# @evalcontextfilter
# def dangerous_render(context, value):
#     return Markup(Template(value).render(context)).render()

for i in range(0,len(steps)):
        #Add step
    step_adder.append(EmrAddStepsOperator(
        task_id=steps[i],
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
        steps=eval('step_'+str(i+1)),
    ))
    print(step_adder)
        #Step Sensor for checking
    step_checker.append(EmrStepSensor(
        task_id=steps[i]+'_check',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        #step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",
        step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}").render({'params': {'step': steps[i]}}))',
        aws_conn_id='aws_default',
    ))

我在这里遇到一个错误,EmrStepSensor 期望 EMR 的 step_id 在这里输入,并且是从 xcom 获取生成的(我想,我不是 100% 确定这段代码是如何工作的)。但是我的步骤存储在步骤列表中,所以我不能在 step_id 的 task_id 中给出静态值,就像在引用代码中给出的那样,我无法弄清楚如何使用带有 python 变量值的 jinja 模板来将值放在这里从步骤列表。

我使用了以下两种方式,以便step_id可以根据steps[i]中的步骤名称从EMR中获取正确的步骤

step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",

step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}")

然而,这两个都因 Airflow 中的语法错误而失败。因此,如果有人能指出我正确的方向来做到这一点,我将非常感激。我使用的是 Airflow 1.10.12(这是 AWS 上 Managed Apache Airflow 中 Airflow 的默认版本)。

最佳答案

我不确定这是否已经解决,所以:

使用 f 字符串:

f"{{{{ task_instance.xcom_pull(task_ids='{steps[i]}', key='return_value')[0] }}}}"

使用 .format: “{{{{ task_instance.xcom_pull(task_ids='{}', key='return_value')[0] }}}}”.format(steps[i])

请注意,您必须确保键 task_ids 的值用单引号引起来。此外,从 xcom_pull 返回的是一个列表,因此索引 [0] 位于 o

关于python - 无法在带有 Airflow 的 Jinja 模板中使用 Python 变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66026406/

相关文章:

python - vim-python-模式 : No module named __future__

amazon-web-services - 亚马逊 EKS : Setting up worker nodes on spot instances

airflow - 如何修复错误 "AirflowException("作业运行器的主机名不匹配“)”?

python - 如何进一步优化这个文本匹配功能?

python - SqlAlchemy UserDefinedType 基于内置类型,带有附加方法

python - 多次导入一个模块有多安全?

amazon-web-services - 当托管区域是父帐户的一部分时,CDK DnsValidatedCertificate : Can create a certificate in a linked AWS account,?

tomcat - Grails 插件安装错误 : java. lang.NoClassDefFoundError: org/grails/plugins/tomcat/fork/ForkedTomcatServer$_findSystemClasspathJars_closure3

python - 如何在 Airflow 中有多个分支?

airflow - AIRFLOW -UI 中缺少 DataProfiling View