python - Airflow SQLCheckOperator 中的错误 - AttributeError : 'NoneType' object has no attribute 'upper'

标签 python airflow airflow-scheduler

我想检查我的表是否正确加载。如果未正确加载,则记录数将为零。我正在使用 SQLCheckOperator 来完成此任务。

这是代码

from airflow.operators.sql import SQLCheckOperator
from datetime import date, timedelta

CURRENT_DATE = str(date.today() - timedelta(2))

TABLE_NAME = "foo"
search_monolith_post_sanity = SQLCheckOperator(
    task_id="search_monolith_post_sanity",
    sql=f"SELECT COUNT(*) FROM `{TABLE_NAME}` WHERE feed_date = DATE_SUB('{CURRENT_DATE}', INTERVAL 1 DAY)",
    bigquery_conn_id='bigquery_default',
    use_legacy_sql=False,
    dag=dag
)

我收到以下错误:

Executing SQL check: SELECT COUNT(*) FROM `foo` WHERE feed_date = DATE_SUB('2021-01-31', INTERVAL 1 DAY)
[2021-02-02 07:16:43,664] {taskinstance.py:1153} ERROR - 'NoneType' object has no attribute 'upper'
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 986, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/operators/sql.py", line 95, in execut
  records = self.get_db_hook().get_first(self.sql
File "/usr/local/lib/airflow/airflow/operators/sql.py", line 116, in get_db_hoo
  return BaseHook.get_hook(conn_id=self.conn_id
File "/usr/local/lib/airflow/airflow/hooks/base_hook.py", line 94, in get_hoo
  connection = cls.get_connection(conn_id
File "/usr/local/lib/airflow/airflow/hooks/base_hook.py", line 87, in get_connectio
  conn = random.choice(list(cls.get_connections(conn_id))
File "/usr/local/lib/airflow/airflow/hooks/base_hook.py", line 83, in get_connection
  return secrets.get_connections(conn_id
File "/usr/local/lib/airflow/airflow/secrets/__init__.py", line 55, in get_connection
  conn_list = secrets_backend.get_connections(conn_id=conn_id
File "/usr/local/lib/airflow/airflow/secrets/base_secrets.py", line 64, in get_connection
  conn_uri = self.get_conn_uri(conn_id=conn_id
File "/usr/local/lib/airflow/airflow/secrets/environment_variables.py", line 39, in get_conn_ur
  environment_uri = os.environ.get(CONN_ENV_PREFIX + conn_id.upper()
AttributeError: 'NoneType' object has no attribute 'upper

我尝试使用 BigQueryCheckOperator 和 CheckOperator 而不是 SQLCheckOperator,但遇到了错误。如果我用 BigQueryOperator 替换 BigQueryCheckOperator,代码可以正常工作,并且输出为零。

我是 Airflow 新手。任何帮助深表感谢。谢谢!!

最佳答案

如果您查看堆栈跟踪中错误消息之前的行。

environment_uri = os.environ.get(CONN_ENV_PREFIX + conn_id.upper()
AttributeError: 'NoneType' object has no attribute 'upper'

在本例中,调用 upper()NoneType 对象是 conn_id

如果您使用 Airflow 1.10.15 documentation因为这个操作符有一个相当重要的注释埋在底部

Note that this is an abstract class and get_db_hook needs to be defined. Whereas a get_db_hook is hook that gets a single record from an external source.

另请注意,该函数的定义似乎需要一个 conn_id 参数。

关于python - Airflow SQLCheckOperator 中的错误 - AttributeError : 'NoneType' object has no attribute 'upper' ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66005381/

相关文章:

python - 查询多个受让人组织的专利的PatentsView

python - "Desired Capabilities must be a dictionary"将 dict 传递到参数化方法时出错

python - 如何以不同的速率从 pandas 数据框中对每个组进行采样

Airflow 1.9.0 正在排队但未启动任务

ubuntu - 防止禁用的 DAG 在启用时立即运行

python - 如何将句子转换为向量

python - 如何在单元测试中测试 Airflow dag?

python - Airflow 从与执行文件相同的指定路径中的文件导入时出现问题

airflow - 将代码分发到 Airflow 网络服务器/调度程序+工作人员和工作流的最佳方式

airflow - Airflow 传感器中的模式 "reschedule"如何工作?