python - 在调用 Airflow 测试时设置 dag_run.conf 参数

标签 python airflow

有谁知道在 bash 提示符下运行 airflow test 时是否可以设置 dag_run.conf 参数?

例如,我下载了 example_trigger_target_dag来自官方 Airflow 存储库,我想测试 run_this 任务。通常我会做以下事情:

~/$ Airflow 测试 example_trigger_target_dag run_this '2018-01-01'

但是运行它会产生错误:

--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2018-05-02 10:50:01,154] {models.py:1342} INFO - Executing <Task(PythonOperator): run_this> on 2018-01-01 00:00:00
[2018-05-02 10:50:01,262] {models.py:1417} ERROR - 'NoneType' object has no attribute 'conf'
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator.py", line 80, in execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/annalect/uk_ds_airflow/dags/playpen/example_trigger_target_dag.py", line 56, in run_this_func
    print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
AttributeError: 'NoneType' object has no attribute 'conf'

我尝试过使用 task_params 参数,但是我的语法错误或者它没有实现我想要的,因为它产生了与上面相同的错误:

~/$ airflow test --task_params '{"kwargs": {"dag_run": {"conf": {"message": "Hey world"}}}}' example_trigger_target_dag run_this '2018-01 -01'

[2018-05-02 11:10:58,065] {models.py:1441} INFO - Marking task as FAILED.
[2018-05-02 11:10:58,070] {models.py:1462} ERROR - 'NoneType' object has no attribute 'conf'

那么有人知道如何测试依赖于 dag_run.conf 值的任务吗?

谢谢!

最佳答案

airflow test 命令没有 --conf 选项,但您可以通过将参数传递给任务的 python_callable 来解决此问题。

在 callable 中,如果设置了 kwargs['test_mode'],您可以检索参数以构建一个虚拟的 DagRun 对象,如下所示:

from airflow.models import DagRun
...

def run_this_func(ds, **kwargs):
    if kwargs['test_mode']:
        kwargs['dag_run'] = DagRun(conf=kwargs['params'])

    print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))

要测试 example_trigger_target_dag,只需执行以下操作:

airflow test example_trigger_target_dag test_trigger_dagrun "2018-01-01" -tp '{"message":"Hello world"}'

你会得到:

Remotely received value of Hello world for key=message

现在您可以编写一个装饰器,而不是将测试代码放入您的任务中。此外,由于我们只是使用 DagRunconf 属性,我们也可以使用 SimpleNamespace。 最后,为了避免在查找 kwargs 时出现潜在的键错误,我们可以使用带有默认值的 get

from types import SimpleNamespace

def allow_conf_testing(func):
    def wrapper(*args, **kwargs):
        if kwargs.get('test_mode', False):
            kwargs['dag_run'] = SimpleNamespace(conf=kwargs.get('params', {}))
        func(*args, **kwargs)
    return wrapper

@allow_conf_testing
def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))

关于python - 在调用 Airflow 测试时设置 dag_run.conf 参数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50132215/

相关文章:

airflow - 在 Google Composer 中删除 DAG - Airflow UI

python - Airflow DAG - 使用 SimpleHttpOperator 访问上下文以启用 XCOM 拉取

python - 如何处理 Cython ValueError

python - Keras Predict_proba 中的神经网络始终返回等于 1 的概率

python - 在命令行中终止python for循环程序而不关闭它?

python - Airflow:如何确保 DAG 每 5 分钟运行一次?

python - 强制 Airflow 以退出代码 1 终止

email - 我应该用什么方法发送带有 Airflow 的电子邮件?

python - 获取 Blobstore key

python - 使用 numpy 进行多元线性回归