我有两个任务,一个是自定义运算符,它有一个模板字段 (snapshot_date_str
),它将在“xcom”中设置该字段,另一个运算符是 S3Sensor
和 bucket_key
需要在第一个任务中设置的模板字段。
Dag 定义:
SNAPSHOT_DATE = datetime.now().date()
S3_BUCKET = 'test-s3'
TENANT = 'test'
dag = DAG('template_fields_dag',
default_args=default_args,
schedule_interval='@hourly',
concurrency=1,
catchup=False)
t1 = ContextInitOperator(task_id='set_context', snapshot_date=SNAPSHOT_DATE, tenant=TENANT, dag=dag)
file_task = S3KeySensor(task_id="s3_file_sensor",
aws_conn_id='s3_connection',
bucket_key='test/{{ snapshot_date_str }}/abc.csv',
bucket_name=S3_BUCKET,
wildcard_match=True,
poke_interval=10,
timeout=60,
dag=dag)
t1 >> file_task
我的自定义 ContextInitOperator
在 xcom 中设置模板字段 snapshot_date_str
。
class ContextInitOperator(BaseOperator):
template_fields = ('snapshot_date_str',)
@apply_defaults
def __init__(
self,
snapshot_date,
*args, **kwargs):
super(ContextInitOperator, self).__init__(*args, **kwargs)
self.snapshot_date_str = snapshot_date.strftime('%Y-%m-%d')
def execute(self, context):
context['task_instance'].xcom_push(key='snapshot_date_str', value=self.snapshot_date_str)
bucket_key
需要路径中的 snapshot_date_str
。
我对 Python 和 Airflow 还不是很熟悉,我是不是漏掉了一些基本的东西?任何帮助将不胜感激。
最佳答案
来自 documentation ,您可能必须按照以下方式做一些事情
bucket_key="test/{{ task_instance.xcom_pull(task_ids='set_context', key='snapshot_date_str') }}/abc.csv"
关于python - Airflow :从上游任务访问模板字段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50246342/