我实现了 airflow ,我们可以添加数据完整性逻辑吗?假设我有 Task1 执行以下任务
1.Read the data from the data source--RAW DATA.
2. do join with dimensional table to get the some relation detail product name etc.
3. Store output file some location after step 2.
有一个 task2 将输出文件存储到数据库中。但在执行 task2 之前,我需要进行一些数据验证,例如 RAW DATA 的计数应等于存储输出文件计数,即加入后 像 count(raw_data) = count(raw_data_join_with_dimensional) ,如果它是 true 则触发 Task2 否则发送警报并使作业失败。
最佳答案
对于该用例,可能的工作流程可能是:
check_op = SQLCheckOperator(
task_id='check_task',
sql='YOUR VALIDATION SQL',
conn_id='YOUR CONN',
)
t2_op = YourNextOperator()
failure_op = EmailOperator(subject='check has failed', to='YOUR EMAIL', trigger_rule='one_failed')
check_op >> [t2_op, failure_op]
它的工作原理如下:
SQLCheckOperator
对数据库运行查询。如果查询返回False
,则检查失败,因此运算符将处于Failure
状态。如果查询返回值,则查询视为成功,因此运算符将处于Success
状态。- 如果
SQLCheckOperator
状态为失败,将触发 EmailOperator,否则将触发YourNextOperator
。
关于validation - 如何在 Airflow 中添加数据完整性逻辑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64976593/