我尝试向 GoogleCloudStorageToBigQueryOperator
的 source_objects
字段提供一个字符串列表,但使用以下代码时出现错误:
string indices must be integers, not unicode
我不知道的事情:
- 如何在 DAG 范围内使用 XCOM 获取
get_file_name
的返回
值? - 如何在 DAG 范围内调用
xcom_pull
函数而无需提供上下文?在我看来,任务实例不需要提供上下文。
我想到的事情:
- 重写运算符并采用 XCOM 作为参数
我想做的事:
- 我希望能够调用接线员
此外,似乎某些运算符的字段使用了名为 templated_field
的功能,模板字段背后的机制是什么?不仅仅适用于 PythonOperator
和 BashOperator
?
最后一个问题,为什么 PythonOperator
不返回 TaskInstance
?
with DAG('bq_load_file_from_cloud_function', default_args=default_args) as dag:
def get_file_name_from_conf(ds, **kwargs):
fileName = kwargs['dag_run'].conf['fileName']
return [fileName]
get_file_name = PythonOperator(
task_id='get_file_name',
provide_context=True,
python_callable=get_file_name_from_conf)
bq_load = GoogleCloudStorageToBigQueryOperator(
task_id='bq_load',
bucket='src_bucket',
#source_objects=['data.csv'],
source_objects=get_file_name.xcom_pull(context='', task_ids='get_file_name'),
destination_project_dataset_table='project:dataset.table',
write_disposition='WRITE_EMPTY')
bq_load.set_upstream(get_file_name)
我对 Python 和 Airflow 有点陌生。我想这些事情应该是微不足道的。我确信我在这里误解了一些东西。
最佳答案
经过多次测试,我想出了这个解决方案,感谢 tobi6 的评论,为我指明了正确的方向。我必须使用template_fields功能。
当我尝试返回包含单个字符串的列表时,出现连接错误,因此我必须在 XCOM 中返回单个字符串,并用方括号将模板调用括起来,以使结果成为列表。
这是最终的代码:
with DAG('bq_load_file_from_cloud_function', default_args=default_args) as dag:
def get_file_name_from_conf(ds, **kwargs):
return kwargs['dag_run'].conf['fileName']
get_file_name = PythonOperator(
task_id='get_file_name',
provide_context=True,
python_callable=get_file_name_from_conf)
bq_load = GoogleCloudStorageToBigQueryOperator(
task_id='bq_load',
bucket='src_bucket',
source_objects=["{{ task_instance.xcom_pull(task_ids='get_file_name') }}"],
destination_project_dataset_table='project:dataset.table',
write_disposition='WRITE_APPEND')
bq_load.set_upstream(get_file_name)
关于python - 将返回值从运算符传递给 Airflow 中的后续运算符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51892190/