我在 Airflow 1.9 上有以下代码:
import_op = MySqlToGoogleCloudStorageOperator(
task_id='import',
mysql_conn_id='oproduction',
google_cloud_storage_conn_id='gcpm',
provide_context=True,
approx_max_file_size_bytes = 100000000, #100MB per file
sql = 'import.sql',
params={'next_to_import': NEXT_TO_IMPORT, 'table_name' : TABLE_NAME},
bucket=GCS_BUCKET_ID,
filename=file_name_orders,
dag=dag)
为什么会生成:
/usr/local/lib/python2.7/dist-packages/airflow/models.py:2160: PendingDeprecationWarning: Invalid arguments were passed to MySqlToGoogleCloudStorageOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'provide_context': True} category=PendingDeprecationWarning
provide_context
有什么问题?据我所知,使用 params
需要它。
最佳答案
params
不需要
provide_context
。
params
参数(dict
类型)可以传递给任何 Operator。
您主要将 provide_context
与 PythonOperator
、BranchPythonOperator
一起使用。一个很好的例子是 https://airflow.readthedocs.io/en/latest/howto/operator.html#pythonoperator .
MySqlToGoogleCloudStorageOperator
没有参数 provide_context
,因此它会在 **kwargs
中传递,并且您会收到弃用警告。
如果您检查 PythonOperator
的文档字符串中的 provide_context
:
if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define
**kwargs
in your function header.
查看源码有如下代码:
if self.provide_context:
context.update(self.op_kwargs)
context['templates_dict'] = self.templates_dict
self.op_kwargs = context
简单来说,它将以下带有 templates_dict
的字典传递给 python_callable
中的函数:
{
'END_DATE': ds,
'conf': configuration,
'dag': task.dag,
'dag_run': dag_run,
'ds': ds,
'ds_nodash': ds_nodash,
'end_date': ds,
'execution_date': self.execution_date,
'latest_date': ds,
'macros': macros,
'params': params,
'run_id': run_id,
'tables': tables,
'task': task,
'task_instance': self,
'task_instance_key_str': ti_key_str,
'test_mode': self.test_mode,
'ti': self,
'tomorrow_ds': tomorrow_ds,
'tomorrow_ds_nodash': tomorrow_ds_nodash,
'ts': ts,
'ts_nodash': ts_nodash,
'yesterday_ds': yesterday_ds,
'yesterday_ds_nodash': yesterday_ds_nodash,
}
因此可以在函数中使用它,如下所示:
def print_context(ds, **kwargs):
pprint(kwargs)
ti = context['task_instance']
exec_date = context['execution_date']
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag,
)
关于 Airflow 弃用警告传递了无效参数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53248759/