validation - 如何在 Airflow 中添加数据完整性逻辑

标签 validation airflow

我实现了 airflow ,我们可以添加数据完整性逻辑吗?假设我有 Task1 执行以下任务

1.Read the data from the data source--RAW DATA.
2. do join with dimensional table to get the some relation detail product name etc.
3. Store output file some location after step 2.

有一个 task2 将输出文件存储到数据库中。但在执行 task2 之前,我需要进行一些数据验证,例如 RAW DATA 的计数应等于存储输出文件计数,即加入后 像 count(raw_data) = count(raw_data_join_with_dimensional) ,如果它是 true 则触发 Task2 否则发送警报并使作业失败。

最佳答案

对于该用例,可能的工作流程可能是:

check_op = SQLCheckOperator(
    task_id='check_task',
    sql='YOUR VALIDATION SQL',
    conn_id='YOUR CONN',
)

t2_op = YourNextOperator()

failure_op = EmailOperator(subject='check has failed', to='YOUR EMAIL', trigger_rule='one_failed')

check_op >> [t2_op, failure_op]

它的工作原理如下:

  1. SQLCheckOperator 对数据库运行查询。如果查询返回 False,则检查失败,因此运算符将处于 Failure 状态。如果查询返回值,则查询视为成功,因此运算符将处于 Success 状态。
  2. 如果 SQLCheckOperator 状态为失败,将触发 EmailOperator,否则将触发 YourNextOperator

关于validation - 如何在 Airflow 中添加数据完整性逻辑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64976593/

相关文章:

javascript - 带有密码强度检查的 jQuery 表单验证

Swift - 验证 UITextField

java - 针对多个 XSD 验证 XML

PHP preg_match 只允许数字、空格 '+' 和 '-'

python - 在 Airflow 宏中传递变量

Airflow :在 Airflow 中更改 DAG 的 crontab 时间

python - 如何在 Cloud Composer 环境中获取合适的凭据以调用 Google Sheets API?

automation - 使用分支逻辑在本地自动化工作流程

Airflow 1.10 - 任务之间的长时间延迟

angular - 如何让 FormControl 重新检查具有响应式表单的 FormGroup 的验证