python - psycopg2 的 fast_executemany 替代方案

标签 python sqlalchemy

我有一个 Redshift 服务器,它是通过 psycopg2 启动的(请注意,公司服务器不支持 ODBC,因此我无法使用 pyodbc)。

目前通过 pd.to_sql() 处理 30-35k 行需要 10 多分钟,它从数据帧写入 Redshift DB。因此,作为一种解决方法,我将 DF 下载为 csv,将文件推送到 S3,然后使用 copy写入数据库。
fast_executemany按照 Speeding up pandas.DataFrame.to_sql with fast_executemany of pyODBC 的解决方案本来是完美的 - 但是 psycopg2 不支持此功能.
我也找到了 d6tstack按照 https://github.com/d6t/d6tstack/blob/master/examples-sql.ipynb但是 pd_to_psql不适用于 Redshift,仅适用于 Postgresql(不能 copy... from stdin)

我可以为我的案例使用任何替代方案吗?

这是我的代码:

import sqlalchemy as sa

DATABASE = ""
USER = ""
PASSWORD = ""
HOST = "...us-east-1.redshift.amazonaws.com"
PORT = "5439"
SCHEMA = "public" 

server = "redshift+psycopg2://%s:%s@%s:%s/%s" % (USER,PASSWORD,HOST,str(PORT),DATABASE)
engine = sa.create_engine(server)
conn = engine.raw_connection()

with conn.cursor() as cur:
    cur.execute('truncate table_name')

df.to_sql('table_name', engine, index=False, if_exists='append') 

最佳答案

如果您无法使用 COPY from S3并且必须依赖DML,您可以尝试通过 use_batch_mode=True create_engine() :

engine = create_engine('theurl', use_batch_mode=True)

从这台机器向 Redshift 集群简单插入 500 行显示了启用批处理模式的合理改进:
In [31]: df = pd.DataFrame({'batchno': range(500)})

In [32]: %time df.to_sql('batch', engine, index=False, if_exists='append')
CPU times: user 87.8 ms, sys: 57.6 ms, total: 145 ms
Wall time: 1min 6s

In [33]: %time df.to_sql('batch', bm_engine, index=False, if_exists='append')
CPU times: user 10.3 ms, sys: 4.66 ms, total: 15 ms
Wall time: 9.96 s

请注意,Pandas 0.23.0 和 0.24.0 及更高版本不会从使用批处理模式中受益,因为如果底层 DBMS 支持,它们使用多值插入而不是 executemany。使用多值插入应该会在吞吐量上提供一些类似的改进,因为发出的查询更少。

关于python - psycopg2 的 fast_executemany 替代方案,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53656225/

相关文章:

python - 使用 lxml 解析包含默认命名空间的 xml 以获取元素值

python - pyplot 中的平移和缩放非常慢

python - Pycharm 警告 SqlAlchemy 模型中出现意外类型

python - 使用 SQLAlchemy 执行查询时避免参数绑定(bind)

python - 数据库设计中表的对应对象

python - SQLalchemy - 来自查询的模型

python - TensorFlow 变量无法初始化

python - 如何在 python3 ctypes 中获取 c_char_p 值作为字节?

Python:将全局对象声明为实例变量

python - 如何在 sqlalchemy 中正确关闭 mysql 连接?