我正在尝试从 Google 存储中读取一堆大型 csv 文件(多个文件)。我使用 Dask 分发库进行并行计算,但我在这里面临的问题是,虽然我提到了 blocksize (100mb),但我不知道如何逐个分区读取并将其保存到我的 postgres 数据库中,这样我就不会重载内存。
from dask.distributed import Client
from dask.diagnostics import ProgressBar
client = Client(processes=False)
import dask.dataframe as dd
def read_csv_gcs():
with ProgressBar():
df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
pd = df.compute(scheduler='threads')
return pd
def write_df_to_db(df):
try:
from sqlalchemy import create_engine
engine = create_engine('postgresql://usr:pass@localhost:5432/sampledb')
df.to_sql('sampletable', engine, if_exists='replace',index=False)
except Exception as e:
print(e)
pass
pd = read_csv_gcs()
write_df_to_db(pd)
上面的代码是我的基本实现,但正如我所说,我想分块读取它并更新数据库。类似的东西
df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
for chunk in df:
write_it_to_db(chunk)
是否可以在Dask中做到这一点?或者我应该选择pandas的 block 大小并迭代,然后将其保存到数据库(但我在这里错过了并行计算)?
有人可以透露一些信息吗?
最佳答案
这一行
df.compute(scheduler='threads')
说:在工作线程中加载数据 block ,并将它们全部连接到一个内存中的数据帧,df
。这不是你想要的。您希望在 block 出现时插入它们,然后将它们从内存中删除。
您可能想使用map_partitions
df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
df.map_partitions(write_it_to_db).compute()
或使用df.to_delayed()
。
请注意,根据您的 SQL 驱动程序,您可能无法通过这种方式获得并行性,如果没有,pandas iter-chunk 方法也可以正常工作。
关于python - 如何使用 Dask 从谷歌云存储中读取多个大型 CSV 文件的 block ,而不会使内存一次性重载,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56823593/