我正在尝试使用 airflow
将数据从 S3 存储桶
复制到 Redshift 数据库
,这是我的代码:
from airflow.hooks import PostgresHook
path = 's3://my_bucket/my_file.csv'
redshift_hook = PostgresHook(postgres_conn_id='table_name')
access_key='abcd'
secret_key='aaaa'
query= """
copy my_table
FROM '%s'
ACCESS_KEY_ID '%s'
SECRET_ACCESS_KEY '%s'
REGION 'eu-west-1'
ACCEPTINVCHARS
IGNOREHEADER 1
FILLRECORD
CSV
BLANKSASNULL
EMPTYASNULL
MAXERROR 100
DATEFORMAT 'MM/DD/YYYY'
""" % ( path,
access_key,
secret_key)
redshift_hook.run(query)
但是当我运行此脚本时,它会引发以下错误:
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: connection [SQL: 'SELECT connection.password AS connection_password, connection.extra AS connection_extra, connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.port AS connection_port, connection.is_encrypted AS connection_is_encrypted, connection.is_extra_encrypted AS connection_is_extra_encrypted \nFROM connection \nWHERE connection.conn_id = ?'] [parameters: ('elevaate_uk_production',)]
我可以得到一些帮助吗? 预先感谢您。
最佳答案
您的connection_id与表名相同吗?
您需要转到 http://………/admin/connections/上的 Airflow ui 并为您的 Redshift 集群添加一个 postgres 连接 ID。现在将连接 ID 的名称放入您写入 table_name
的位置。
当您定义一个 s3 连接并将访问权限和 key 放入其中时。通过连接 ID 名称实例化 SSHHook 来加载它,然后从中获取 key 。
最后将您的 ...run(query)
替换为 PostgresOperator
。将键放入参数字典中,然后在 SQL 字符串中使用:
from airflow.operators import PostgresOperator
form airflow.hooks import S3Hook
s3 = S3hook(aws_conn_id="s3_conn_id_in_airflow_ui_man")
redshift_load_task = PostgresOperator("""
copy my_table
FROM '{{ params.source }}'
ACCESS_KEY_ID '{{ params.access_key}}'
SECRET_ACCESS_KEY '{{ params.secret_key }}'
REGION 'eu-west-1'
ACCEPTINVCHARS
IGNOREHEADER 1
FILLRECORD
CSV
BLANKSASNULL
EMPTYASNULL
MAXERROR 100
DATEFORMAT 'MM/DD/YYYY'
""",
postgres_conn_id="redshift_conn_id_in_airflow_ui_man",
database="uh_you_tell_me",
params={
'source': 's3://my_bucket/my_file.csv',
'access_key': s3.get_credentials().access_key,
'secret_key': s3.get_credentials().secret_key,
},
)
关于python - 将数据从 Amazon s3 复制到 redshift,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49633643/