python - Airflow :从上游任务访问模板字段

标签 python python-3.x jinja2 airflow

我有两个任务,一个是自定义运算符,它有一个模板字段 (snapshot_date_str),它将在“xcom”中设置该字段,另一个运算符是 S3Sensorbucket_key 需要在第一个任务中设置的模板字段。

Dag 定义:

SNAPSHOT_DATE = datetime.now().date()
S3_BUCKET = 'test-s3'
TENANT = 'test'

dag = DAG('template_fields_dag',
          default_args=default_args,
          schedule_interval='@hourly',
          concurrency=1,
          catchup=False)

t1 = ContextInitOperator(task_id='set_context', snapshot_date=SNAPSHOT_DATE, tenant=TENANT, dag=dag)

file_task = S3KeySensor(task_id="s3_file_sensor",
                        aws_conn_id='s3_connection',
                        bucket_key='test/{{ snapshot_date_str }}/abc.csv',
                        bucket_name=S3_BUCKET,
                        wildcard_match=True,
                        poke_interval=10,
                        timeout=60,
                        dag=dag)
t1 >> file_task

我的自定义 ContextInitOperator 在 xcom 中设置模板字段 snapshot_date_str

class ContextInitOperator(BaseOperator):

    template_fields = ('snapshot_date_str',)

    @apply_defaults
    def __init__(
            self,
            snapshot_date,
            *args, **kwargs):
        super(ContextInitOperator, self).__init__(*args, **kwargs)
        self.snapshot_date_str = snapshot_date.strftime('%Y-%m-%d')

    def execute(self, context):
        context['task_instance'].xcom_push(key='snapshot_date_str', value=self.snapshot_date_str)

bucket_key 需要路径中的 snapshot_date_str

我对 Python 和 Airflow 还不是很熟悉,我是不是漏掉了一些基本的东西?任何帮助将不胜感激。

最佳答案

来自 documentation ,您可能必须按照以下方式做一些事情

bucket_key="test/{{ task_instance.xcom_pull(task_ids='set_context', key='snapshot_date_str') }}/abc.csv"

关于python - Airflow :从上游任务访问模板字段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50246342/

相关文章:

python - 访问 MIB 对象

python - Mac OS-X Mountain Lion 上的 GCC-4.2 错误,无法使用 pip/virtualenv 安装某些包

python-3.x - 不能用 pandas 删除 na 在 Python 中读取 excel 文件

python - openpyxl 检查行是否包含两个单独值的单元格

python - ansible/jinja2 获取独特的子元素

python - 尝试将对象转换为python中的int类型的问题

python - 为什么 python 的 datetime.datetime.strptime ('201412' , '%Y%m%d' ) 不引发 ValueError?

python - 如何在 python 中替换/删除字符串

javascript - 如何根据textContent值更改每个标签的CSS类

python - 多个 Django 模板加载器