python - 将数据从 Amazon s3 复制到 redshift

标签 python amazon-web-services amazon-s3 amazon-redshift airflow

我正在尝试使用 airflow 将数据从 S3 存储桶 复制到 Redshift 数据库,这是我的代码:

from airflow.hooks import PostgresHook
path = 's3://my_bucket/my_file.csv'

redshift_hook = PostgresHook(postgres_conn_id='table_name')
access_key='abcd' 
secret_key='aaaa'
query= """
copy my_table 
FROM '%s' 
ACCESS_KEY_ID '%s' 
SECRET_ACCESS_KEY '%s' 
REGION 'eu-west-1' 
ACCEPTINVCHARS 
IGNOREHEADER 1 
FILLRECORD 
CSV
BLANKSASNULL 
EMPTYASNULL 
MAXERROR 100 
DATEFORMAT 'MM/DD/YYYY'
""" % ( path,
        access_key,
        secret_key) 

redshift_hook.run(query)

但是当我运行此脚本时,它会引发以下错误:

    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: connection [SQL: 'SELECT connection.password AS connection_password, connection.extra AS connection_extra, connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.port AS connection_port, connection.is_encrypted AS connection_is_encrypted, connection.is_extra_encrypted AS connection_is_extra_encrypted \nFROM connection \nWHERE connection.conn_id = ?'] [parameters: ('elevaate_uk_production',)]

我可以得到一些帮助吗? 预先感谢您。

最佳答案

您的connection_id与表名相同吗? 您需要转到 http://………/admin/connections/上的 Airflow ui 并为您的 Redshift 集群添加一个 postgres 连接 ID。现在将连接 ID 的名称放入您写入 table_name 的位置。

当您定义一个 s3 连接并将访问权限和 key 放入其中时。通过连接 ID 名称实例化 SSHHook 来加载它,然后从中获取 key 。

最后将您的 ...run(query) 替换为 PostgresOperator 。将键放入参数字典中,然后在 SQL 字符串中使用:

from airflow.operators import PostgresOperator
form airflow.hooks import S3Hook

s3 = S3hook(aws_conn_id="s3_conn_id_in_airflow_ui_man") 
redshift_load_task = PostgresOperator("""
copy my_table 
FROM '{{ params.source }}' 
ACCESS_KEY_ID '{{ params.access_key}}' 
SECRET_ACCESS_KEY '{{ params.secret_key }}' 
REGION 'eu-west-1' 
ACCEPTINVCHARS 
IGNOREHEADER 1 
FILLRECORD 
CSV
BLANKSASNULL 
EMPTYASNULL 
MAXERROR 100 
DATEFORMAT 'MM/DD/YYYY'
""",
postgres_conn_id="redshift_conn_id_in_airflow_ui_man",
database="uh_you_tell_me",
params={
    'source': 's3://my_bucket/my_file.csv',
    'access_key': s3.get_credentials().access_key,
    'secret_key': s3.get_credentials().secret_key,
},
)

关于python - 将数据从 Amazon s3 复制到 redshift,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49633643/

相关文章:

node.js - 使用 AWS Lambda 上传到 AWS S3

node.js - NodeJS : How would I compress a stream before uploading to S3?

python - Tensorflow 对象检测 : how to detect on batch

python - 在 Mac os Mavericks 上安装 wxPython

python - Django 正在使用哪个服务器?

python - pyocr 与 tesseract 内存不足

amazon-web-services - 在 AWS EC2 实例中加速 Chromedriver/Selenium

linux - 当我尝试推送到 Linux AWS 服务器上的裸存储库时,出现 'insufficient permission...' 错误

amazon-web-services - AWS Elasticsearch日志轮换

python - Apache Spark 读取 S3 : can't pickle thread. 锁对象