python - 如何使用 Dask 从谷歌云存储中读取多个大型 CSV 文件的 block ,而不会使内存一次性重载

标签 python pandas postgresql dask dask-distributed

我正在尝试从 Google 存储中读取一堆大型 csv 文件(多个文件)。我使用 Dask 分发库进行并行计算,但我在这里面临的问题是,虽然我提到了 blocksize (100mb),但我不知道如何逐个分区读取并将其保存到我的 postgres 数据库中,这样我就不会重载内存。

    from dask.distributed import Client
    from dask.diagnostics import ProgressBar
    client = Client(processes=False)
    import dask.dataframe as dd

    def read_csv_gcs():
      with ProgressBar():
        df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
        pd = df.compute(scheduler='threads')
        return pd

    def write_df_to_db(df):
      try:
        from sqlalchemy import create_engine
        engine = create_engine('postgresql://usr:pass@localhost:5432/sampledb')
        df.to_sql('sampletable', engine, if_exists='replace',index=False)
      except Exception as e:
        print(e)
        pass

    pd = read_csv_gcs()
    write_df_to_db(pd)

上面的代码是我的基本实现,但正如我所说,我想分块读取它并更新数据库。类似的东西

    df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
    for chunk in df:
       write_it_to_db(chunk)

是否可以在Dask中做到这一点?或者我应该选择pandas的 block 大小并迭代,然后将其保存到数据库(但我在这里错过了并行计算)?

有人可以透露一些信息吗?

最佳答案

这一行

df.compute(scheduler='threads')

说:在工作线程中加载数据 block ,并将它们全部连接到一个内存中的数据帧,df。这不是你想要的。您希望在 block 出现时插入它们,然后将它们从内存中删除

您可能想使用map_partitions

df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
df.map_partitions(write_it_to_db).compute()

或使用df.to_delayed()

请注意,根据您的 SQL 驱动程序,您可能无法通过这种方式获得并行性,如果没有,pandas iter-chunk 方法也可以正常工作。

关于python - 如何使用 Dask 从谷歌云存储中读取多个大型 CSV 文件的 block ,而不会使内存一次性重载,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56823593/

相关文章:

python - 值错误 : I/O operation on closed file with ffmpeg

python-3.x - 比较两个数据帧并根据匹配的列值从 df 中删除行

python - Pandas DatetimeIndex 与 to_datetime 差异

postgresql - Postgres 接受任何密码

swift - 我可以使用直接 SQL 来使用 Fluent (Vapor) 获取表中的行数吗?

python - 如何在 Python 上使用 "pip"安装 psycopg2?

python - 比较两个大的 csv 文件并用 python 编写另一个

python - CS50 Web 编程 - 导入 books.csv 文件时出现 Postgres SQL 错误

python - 填写缺失日期的快捷方式

SQL计数和过滤查询优化