我工作的环境可以描述如下:
数据库以及我想从中提取的内容
运行分析所需的数据存储在单个非规范化(超过 100 列)Oracle 表中。财务报告数据每天都会发布到表中,并在报告日期进行范围分区(每天一个分区)。这是我打算运行的查询的结构:
SELECT col1,
col2,
col3
FROM table
WHERE date BETWEEN start_date AND end_date
使用 Dask 加载数据的策略
我正在使用sqlalchemy
与 cx_Oracle
驱动程序访问数据库。我遵循的与 Dask 并行加载数据的策略是:
from dask import bag as db
def read_rows(from_date, to_date, engine):
engine.dispose()
query = """
-- Query Text --
""".format(from_date, to_date)
with engine.connect() as conn:
ret = conn.execute(query).fetchall()
return ret
engine = create_engine(...) # initialise sqlalchemy engine
add_engine_pidguard(engine) # adding pidguard to engine
date_ranges = [...] # list of (start_date, end_date)-tuples
data_db = db.from_sequence(date_ranges)
.map(lambda x: read_rows(from_date=x[0], to_date=x[1], engine=engine)).concat()
# ---- further process data ----
...
add_engine_pidguard
取自 sqlalchemy 文档:How do I use engines / connections / sessions with Python multiprocessing, or os.fork()?
问题
- 当前运行阻塞查询的方式是否良好 - 或者是否有更简洁的方式来实现此目的
sqlalchemy
? - 由于查询在
multiprocessing
中运行环境,管理引擎的方法是否符合其实现方式? - 当前我正在执行“原始查询”,从性能角度来看,在
declarative_base
中定义表是否有益? (具有各自的列类型)并使用session.query
在read_rows
中的所需列上?
最佳答案
我很乐意尝试按照以下方式编写代码
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed
...
con = engine.connect()
df = dd.from_delayed([
delayed(pd.read_sql_query)(QUERY, con, params=params)
for params in date_ranges
])
在这种情况下,我只有一个连接——据我所知,cx_Oracle 连接能够被多个线程使用。数据是使用 dask.dataframe 加载的,除了线程调度程序之外,还没有执行任何操作。数据库 IO 和许多 pandas 操作都会释放 GIL,因此线程调度程序是一个很好的选择。
这将使我们直接跳转到拥有一个数据框,这对于结构化数据的许多操作来说都是很好的。
Currently I am executing a "raw query", would it be beneficial from a performance point of view to define the table in a
declarative_base
(with respective column types) and usesession.query
on the required columns from withinread_rows
?
据我了解,这不太可能提高性能。
关于sqlalchemy - 在 SQLAlchemy 中正确处理 Dask 多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39120823/