针对 BigQuery 运行 Airflow 集成测试的好/推荐方法是什么?
Airflow 似乎有不少专为测试而设计的运算符,例如 BigQueryCheckOperator
。我正在努力寻找任何最佳实践或示例来展示如何将其用于集成测试。
推荐的集成测试方法是什么:
- 具体任务
- 整个 DAG
最好使用用 Python 编写的测试框架,因为 Airflow 也是用 Python 编写的,例如 pytest
或类似框架。
最佳答案
您可以像这样对整个 dag 运行测试:
import unittest
from airflow.models import DagBag
class TestDagIntegrity(unittest.TestCase):
LOAD_SECOND_THRESHOLD = 2
def setUp(self):
self.dagbag = DagBag()
def test_import_dags(self):
""" Test if dags works, no fail in import
"""
self.assertFalse(
len(self.dagbag.import_errors),
'DAG import failures. Errors: {}'.format(
self.dagbag.import_errors
)
)
suite = unittest.TestLoader().loadTestsFromTestCase(TestDagIntegrity)
unittest.TextTestRunner(verbosity=2).run(suite)
如果没有导入失败或缺少变量,将测试您的 dags 是否有效
关于python - 与 Airflow 的集成测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50052065/