我想检查我的表是否正确加载。如果未正确加载,则记录数将为零。我正在使用 SQLCheckOperator 来完成此任务。
这是代码
from airflow.operators.sql import SQLCheckOperator
from datetime import date, timedelta
CURRENT_DATE = str(date.today() - timedelta(2))
TABLE_NAME = "foo"
search_monolith_post_sanity = SQLCheckOperator(
task_id="search_monolith_post_sanity",
sql=f"SELECT COUNT(*) FROM `{TABLE_NAME}` WHERE feed_date = DATE_SUB('{CURRENT_DATE}', INTERVAL 1 DAY)",
bigquery_conn_id='bigquery_default',
use_legacy_sql=False,
dag=dag
)
我收到以下错误:
Executing SQL check: SELECT COUNT(*) FROM `foo` WHERE feed_date = DATE_SUB('2021-01-31', INTERVAL 1 DAY)
[2021-02-02 07:16:43,664] {taskinstance.py:1153} ERROR - 'NoneType' object has no attribute 'upper'
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 986, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/operators/sql.py", line 95, in execut
records = self.get_db_hook().get_first(self.sql
File "/usr/local/lib/airflow/airflow/operators/sql.py", line 116, in get_db_hoo
return BaseHook.get_hook(conn_id=self.conn_id
File "/usr/local/lib/airflow/airflow/hooks/base_hook.py", line 94, in get_hoo
connection = cls.get_connection(conn_id
File "/usr/local/lib/airflow/airflow/hooks/base_hook.py", line 87, in get_connectio
conn = random.choice(list(cls.get_connections(conn_id))
File "/usr/local/lib/airflow/airflow/hooks/base_hook.py", line 83, in get_connection
return secrets.get_connections(conn_id
File "/usr/local/lib/airflow/airflow/secrets/__init__.py", line 55, in get_connection
conn_list = secrets_backend.get_connections(conn_id=conn_id
File "/usr/local/lib/airflow/airflow/secrets/base_secrets.py", line 64, in get_connection
conn_uri = self.get_conn_uri(conn_id=conn_id
File "/usr/local/lib/airflow/airflow/secrets/environment_variables.py", line 39, in get_conn_ur
environment_uri = os.environ.get(CONN_ENV_PREFIX + conn_id.upper()
AttributeError: 'NoneType' object has no attribute 'upper
我尝试使用 BigQueryCheckOperator 和 CheckOperator 而不是 SQLCheckOperator,但遇到了错误。如果我用 BigQueryOperator 替换 BigQueryCheckOperator,代码可以正常工作,并且输出为零。
我是 Airflow 新手。任何帮助深表感谢。谢谢!!
最佳答案
如果您查看堆栈跟踪中错误消息之前的行。
environment_uri = os.environ.get(CONN_ENV_PREFIX + conn_id.upper()
AttributeError: 'NoneType' object has no attribute 'upper'
在本例中,调用 upper()
的 NoneType
对象是 conn_id
。
如果您使用 Airflow 1.10.15 documentation因为这个操作符有一个相当重要的注释埋在底部
Note that this is an abstract class and get_db_hook needs to be defined. Whereas a get_db_hook is hook that gets a single record from an external source.
另请注意,该函数的定义似乎需要一个 conn_id
参数。
关于python - Airflow SQLCheckOperator 中的错误 - AttributeError : 'NoneType' object has no attribute 'upper' ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66005381/