sqlalchemy - 在 SQLAlchemy 中正确处理 Dask 多处理

标签 sqlalchemy multiprocessing dask

我工作的环境可以描述如下:

数据库以及我想从中提取的内容

运行分析所需的数据存储在单个非规范化(超过 100 列)Oracle 表中。财务报告数据每天都会发布到表中,并在报告日期进行范围分区(每天一个分区)。这是我打算运行的查询的结构:

SELECT col1, 
       col2, 
       col3
FROM table
WHERE date BETWEEN start_date AND end_date

使用 Dask 加载数据的策略

我正在使用sqlalchemycx_Oracle驱动程序访问数据库。我遵循的与 Dask 并行加载数据的策略是:

from dask import bag as db

def read_rows(from_date, to_date, engine):
    engine.dispose()
    query = """
        -- Query Text --
    """.format(from_date, to_date)
    with engine.connect() as conn:
          ret = conn.execute(query).fetchall()
    return ret

engine = create_engine(...) # initialise sqlalchemy engine
add_engine_pidguard(engine) # adding pidguard to engine
date_ranges = [...] # list of (start_date, end_date)-tuples
data_db = db.from_sequence(date_ranges)
    .map(lambda x: read_rows(from_date=x[0], to_date=x[1], engine=engine)).concat()

# ---- further process data ----
...

add_engine_pidguard取自 sqlalchemy 文档:How do I use engines / connections / sessions with Python multiprocessing, or os.fork()?

问题

  • 当前运行阻塞查询的方式是否良好 - 或者是否有更简洁的方式来实现此目的 sqlalchemy
  • 由于查询在 multiprocessing 中运行环境,管理引擎的方法是否符合其实现方式?
  • 当前我正在执行“原始查询”,从性能角度来看,在 declarative_base 中定义表是否有益? (具有各自的列类型)并使用 session.queryread_rows 中的所需列上?

最佳答案

我很乐意尝试按照以下方式编写代码

import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed

...

con = engine.connect()
df = dd.from_delayed([
    delayed(pd.read_sql_query)(QUERY, con, params=params)
    for params in date_ranges
])

在这种情况下,我只有一个连接——据我所知,cx_Oracle 连接能够被多个线程使用。数据是使用 dask.dataframe 加载的,除了线程调度程序之外,还没有执行任何操作。数据库 IO 和许多 pandas 操作都会释放 GIL,因此线程调度程序是一个很好的选择。

这将使我们直接跳转到拥有一个数据框,这对于结构化数据的许多操作来说都是很好的。


Currently I am executing a "raw query", would it be beneficial from a performance point of view to define the table in a declarative_base (with respective column types) and use session.query on the required columns from within read_rows?

据我了解,这不太可能提高性能。

关于sqlalchemy - 在 SQLAlchemy 中正确处理 Dask 多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39120823/

相关文章:

python - 根据外部排名对 SQLAlchemy 查询结果重新排序

python - 运行 df.to_csv() 时出现 Dask 内存错误

python - 就地修改大于内存的Dask数组

python - 'Collection' object is not callable error in pymongo with process Pool

Python 多处理需要更多时间

python - 模块未找到错误: No module named 'dask.dataframe' ; 'dask' is not a package

python - 使用 SQLAlchemy 检索除部分列之外的所有列

python - SqlAlchemy:如何在 mysql 中制作 LONGBLOB 列?

python - SQLAlchemy Core 批量插入速度慢

python - 使用 Dataframe 进行多处理和队列