etl - 为什么在 Airflow 中使用 aws_athena_hook 时出现 NoRegionError?

标签 etl boto3 airflow amazon-athena

我正在尝试使用airflow按计划使用airflow 在 Athena 中运行查询。

我在下面包含的函数是使用 airflow 中的 PythonOperator 运行的。

from airflow.models import Variable
from airflow.contrib.hooks.aws_athena_hook import AWSAthenaHook
import datetime

def update_athena_partition(*args, **kwargs):
    execution_date = datetime.datetime.strptime(kwargs['ds'], '%Y-%m-%d')
    execution_month = execution_date.month
    execution_year = execution_date.year
    s3_prefix = Variable.get('bikeshare_s3_prefix')
    bucket_name = Variable.get('bikeshare_bucket_name')
    athena_table_name = Variable.get('bikeshare_athena_table')
    result_configuration = {"OutputLocation": "s3://{}/".format(bucket_name)}
    file_location = 's3://bucket_name/' + s3_prefix + f'year=2018/month=2/'
    partition_update_query = """
    ALTER TABLE {} add partition (year="{}", month='{}')
    location "{}";
    """
    athena_hook = AWSAthenaHook(aws_conn_id='aws_credentials')
    athena_hook.run_query(partition_update_query.format(athena_table_name,
                                                        2018,
                                                        2,
                                                        file_location),
                          result_configuration=result_configuration,
                          query_context="athena_database_name")

这是我的 DAG

etl_dag = DAG(
    'ETL_pipeline',
    start_date=datetime.datetime.now()

我已经在 Athena 的 GUI 中尝试过查询,并且查询运行良好。

当我触发 DAG 运行时,这是我收到的错误

*** Reading local file: /Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/airflow/logs/Bikeshare_ETL/Update_partition_schema.task/2019-08-13T02:06:13.072895+00:00/1.log
[2019-08-12 22:09:18,845] {taskinstance.py:616} INFO - Dependencies all met for <TaskInstance: Bikeshare_ETL.Update_partition_schema.task 2019-08-13T02:06:13.072895+00:00 [queued]>
[2019-08-12 22:09:18,856] {taskinstance.py:616} INFO - Dependencies all met for <TaskInstance: Bikeshare_ETL.Update_partition_schema.task 2019-08-13T02:06:13.072895+00:00 [queued]>
[2019-08-12 22:09:18,856] {taskinstance.py:834} INFO - 
--------------------------------------------------------------------------------
[2019-08-12 22:09:18,856] {taskinstance.py:835} INFO - Starting attempt 1 of 1
[2019-08-12 22:09:18,856] {taskinstance.py:836} INFO - 
--------------------------------------------------------------------------------
[2019-08-12 22:09:18,863] {taskinstance.py:855} INFO - Executing <Task(PythonOperator): Update_partition_schema.task> on 2019-08-13T02:06:13.072895+00:00
[2019-08-12 22:09:18,864] {base_task_runner.py:133} INFO - Running: ['airflow', 'run', 'Bikeshare_ETL', 'Update_partition_schema.task', '2019-08-13T02:06:13.072895+00:00', '--job_id', '24', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/bikeshare_etl.py', '--cfg_path', '/var/folders/lq/9rht3n895x77gfzqtpwq2nbh0000gn/T/tmphp_qjc4o']
[2019-08-12 22:09:19,438] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task [2019-08-12 22:09:19,438] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-08-12 22:09:19,739] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task [2019-08-12 22:09:19,738] {dagbag.py:90} INFO - Filling up the DagBag from /Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/airflow/dags/bikeshare_etl.py
[2019-08-12 22:09:19,813] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task [2019-08-12 22:09:19,813] {cli.py:516} INFO - Running <TaskInstance: Bikeshare_ETL.Update_partition_schema.task 2019-08-13T02:06:13.072895+00:00 [running]> on host Vijays-MacBook-Pro.local
[2019-08-12 22:09:19,820] {python_operator.py:105} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=Bikeshare_ETL
AIRFLOW_CTX_TASK_ID=Update_partition_schema.task
AIRFLOW_CTX_EXECUTION_DATE=2019-08-13T02:06:13.072895+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2019-08-13T02:06:13.072895+00:00
[2019-08-12 22:09:19,871] {taskinstance.py:1047} ERROR - You must specify a region.
Traceback (most recent call last):
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 922, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/airflow/dags/bikeshare_etl.py", line 107, in update_athena_partition
    query_context="bikeshare_data")
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/contrib/hooks/aws_athena_hook.py", line 67, in run_query
    response = self.get_conn().start_query_execution(QueryString=query,
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/contrib/hooks/aws_athena_hook.py", line 50, in get_conn
    self.conn = self.get_client_type('athena')
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/contrib/hooks/aws_hook.py", line 170, in get_client_type
    config=config, verify=self.verify)
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/boto3/session.py", line 263, in client
    aws_session_token=aws_session_token, config=config)
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/session.py", line 839, in create_client
    client_config=config, api_version=api_version)
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/client.py", line 86, in create_client
    verify, credentials, scoped_config, client_config, endpoint_bridge)
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/client.py", line 328, in _get_client_args
    verify, credentials, scoped_config, client_config, endpoint_bridge)
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/args.py", line 47, in get_client_args
    endpoint_url, is_secure, scoped_config)
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/args.py", line 117, in compute_client_args
    service_name, region_name, endpoint_url, is_secure)
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/client.py", line 402, in resolve
    service_name, region_name)
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/regions.py", line 122, in construct_endpoint
    partition, service_name, region_name)
  File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/regions.py", line 135, in _endpoint_for_partition
    raise NoRegionError()
botocore.exceptions.NoRegionError: You must specify a region.
[2019-08-12 22:09:19,875] {taskinstance.py:1078} INFO - Marking task as FAILED.
[2019-08-12 22:09:19,888] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task Traceback (most recent call last):
[2019-08-12 22:09:19,888] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/bin/airflow", line 32, in <module>
[2019-08-12 22:09:19,888] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     args.func(args)
[2019-08-12 22:09:19,888] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-08-12 22:09:19,888] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     return f(*args, **kwargs)
[2019-08-12 22:09:19,888] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/bin/cli.py", line 522, in run
[2019-08-12 22:09:19,888] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     _run(args, dag, ti)
[2019-08-12 22:09:19,888] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/bin/cli.py", line 440, in _run
[2019-08-12 22:09:19,888] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     pool=args.pool,
[2019-08-12 22:09:19,888] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
[2019-08-12 22:09:19,888] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     return func(*args, **kwargs)
[2019-08-12 22:09:19,888] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 922, in _run_raw_task
[2019-08-12 22:09:19,888] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     result = task_copy.execute(context=context)
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     return_value = self.execute_callable()
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     return self.python_callable(*self.op_args, **self.op_kwargs)
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/airflow/dags/bikeshare_etl.py", line 107, in update_athena_partition
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     query_context="bikeshare_data")
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/contrib/hooks/aws_athena_hook.py", line 67, in run_query
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     response = self.get_conn().start_query_execution(QueryString=query,
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/contrib/hooks/aws_athena_hook.py", line 50, in get_conn
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     self.conn = self.get_client_type('athena')
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/airflow/contrib/hooks/aws_hook.py", line 170, in get_client_type
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     config=config, verify=self.verify)
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/boto3/session.py", line 263, in client
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     aws_session_token=aws_session_token, config=config)
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/session.py", line 839, in create_client
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     client_config=config, api_version=api_version)
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/client.py", line 86, in create_client
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     verify, credentials, scoped_config, client_config, endpoint_bridge)
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/client.py", line 328, in _get_client_args
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     verify, credentials, scoped_config, client_config, endpoint_bridge)
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/args.py", line 47, in get_client_args
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     endpoint_url, is_secure, scoped_config)
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/args.py", line 117, in compute_client_args
[2019-08-12 22:09:19,889] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     service_name, region_name, endpoint_url, is_secure)
[2019-08-12 22:09:19,890] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/client.py", line 402, in resolve
[2019-08-12 22:09:19,890] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     service_name, region_name)
[2019-08-12 22:09:19,890] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/regions.py", line 122, in construct_endpoint
[2019-08-12 22:09:19,890] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     partition, service_name, region_name)
[2019-08-12 22:09:19,890] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task   File "/Users/vj/Documents/My_Mac/Projects/etl_athena_redshift_airflow/.venv/lib/python3.7/site-packages/botocore/regions.py", line 135, in _endpoint_for_partition
[2019-08-12 22:09:19,890] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task     raise NoRegionError()
[2019-08-12 22:09:19,890] {base_task_runner.py:115} INFO - Job 24: Subtask Update_partition_schema.task botocore.exceptions.NoRegionError: You must specify a region.
[2019-08-12 22:09:23,848] {logging_mixin.py:95} INFO - [[34m2019-08-12 22:09:23,847[0m] {[34mlocal_task_job.py:[0m105} INFO[0m - Task exited with return code 1[0m

请帮我找出我缺少的内容。

最佳答案

从日志中,您需要指定区域。 botocore.exceptions.NoRegionError:您必须指定一个区域。

我认为这可能与您在那里设置连接的方式有关,athena_hook = AWSAthenaHook(aws_conn_id='aws_credentials'),检查您的连接 ID aws_credentials 和您需要指定那里的区域。

在“extra”下,您可以输入区域信息,例如{"region_name": "us-east-1"}

调用aws_hook的一些代码细节在 Airflow 中

关于etl - 为什么在 Airflow 中使用 aws_athena_hook 时出现 NoRegionError?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57470403/

相关文章:

sql-server - SSIS 具有数百万数据可从源和目标进行比较

amazon-web-services - 如何使用 boto3 获取分配给 cloudformation 堆栈的标签?

python - AIRFLOW - 从运算符(operator)列表中设置关系

google-cloud-platform - SFTPoperator 获取多个文件

amazon-web-services - 使用 moto + serverless 模拟 DynamoDB

python - Apache Airflow - 自定义日志格式

json - 在 Spark 中快速处理 json 文件的方法

sql-server - 将 DT_DBTIMESTAMP2 类型的列映射到 SQL Server 中的 datetime2 时,发生转换规范错误的无效字符值

decode - informatica 的更新标志端口中的解码功能是做什么的?

amazon-web-services - RDS generate_presigned_url 不支持 DestinationRegion 参数