Airflow 弃用警告传递了无效参数

标签 airflow

我在 Airflow 1.9 上有以下代码:

import_op = MySqlToGoogleCloudStorageOperator(
    task_id='import',
    mysql_conn_id='oproduction',
    google_cloud_storage_conn_id='gcpm',
    provide_context=True,
    approx_max_file_size_bytes = 100000000, #100MB per file
    sql = 'import.sql',
    params={'next_to_import': NEXT_TO_IMPORT, 'table_name' : TABLE_NAME},
    bucket=GCS_BUCKET_ID,
    filename=file_name_orders,
    dag=dag)

为什么会生成:

/usr/local/lib/python2.7/dist-packages/airflow/models.py:2160: PendingDeprecationWarning: Invalid arguments were passed to MySqlToGoogleCloudStorageOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'provide_context': True} category=PendingDeprecationWarning

provide_context 有什么问题?据我所知,使用 params 需要它。

最佳答案

params 不需要

provide_context

params 参数(dict 类型)可以传递给任何 Operator。

您主要将 provide_contextPythonOperatorBranchPythonOperator 一起使用。一个很好的例子是 https://airflow.readthedocs.io/en/latest/howto/operator.html#pythonoperator .

MySqlToGoogleCloudStorageOperator 没有参数 provide_context,因此它会在 **kwargs 中传递,并且您会收到弃用警告。

如果您检查 PythonOperator 的文档字符串中的 provide_context :

if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define **kwargs in your function header.

查看源码有如下代码:

if self.provide_context:
            context.update(self.op_kwargs)
            context['templates_dict'] = self.templates_dict
            self.op_kwargs = context

简单来说,它将以下带有 templates_dict 的字典传递给 python_callable 中的函数:

{
    'END_DATE': ds,
    'conf': configuration,
    'dag': task.dag,
    'dag_run': dag_run,
    'ds': ds,
    'ds_nodash': ds_nodash,
    'end_date': ds,
    'execution_date': self.execution_date,
    'latest_date': ds,
    'macros': macros,
    'params': params,
    'run_id': run_id,
    'tables': tables,
    'task': task,
    'task_instance': self,
    'task_instance_key_str': ti_key_str,
    'test_mode': self.test_mode,
    'ti': self,
    'tomorrow_ds': tomorrow_ds,
    'tomorrow_ds_nodash': tomorrow_ds_nodash,
    'ts': ts,
    'ts_nodash': ts_nodash,
    'yesterday_ds': yesterday_ds,
    'yesterday_ds_nodash': yesterday_ds_nodash,
}

因此可以在函数中使用它,如下所示:

def print_context(ds, **kwargs):
    pprint(kwargs)
    ti = context['task_instance']
    exec_date = context['execution_date']
    print(ds)
    return 'Whatever you return gets printed in the logs'


run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    dag=dag,
)

关于 Airflow 弃用警告传递了无效参数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53248759/

相关文章:

尽管重试= 0,但 Airflow 任务在失败后重试

airflow - 使用真实脚本实现 Airflow

Python 操作符中的 Airflow 宏

airflow - 2个任务完成后如何设置下游任务

airflow - 无法导入 Airflow 插件

kubernetes - Airflow -KubernetesPodOperator-TypeError : argument of type 'NoneType' is not iterable

constants - 生成 uuid 并在 Airflow DAG 中使用它

Kubernetes (Minikube) 卷已安装(来自 Airflow Helm 值)但仍为空

Airflow Xcom : How to cast byte array for value into text or json text in SQL?

airflow - 在 Airflow 中创建动态池