parquet - 在单个多核机器上索引大型 dask 数据帧时的内存使用情况

标签 parquet dask dask-distributed fastparquet dask.distributed

我正在尝试将 Wikipedia CirrusSearch dump进入 Parquet 支持的 dask 数据帧,在 450G 16 核 GCP 实例上按标题索引。 CirrusSearch 转储为单个 json 行格式文件。 英文 Wipedia 转储包含 5M 记录,压缩后为 12G,扩展后为 90+G。 一个重要的细节是记录并不完全平坦。

最简单的方法是

import json
import dask
from  dask import bag as db, dataframe as ddf
from  toolz import curried as tz
from toolz.curried import operator as op

blocksize=2**24
npartitions='auto'
parquetopts=dict(engine='fastparquet', object_encoding='json')

lang = 'en'
wiki = 'wiki'
date = 20180625
path='./'

source = f'{path}{lang}{wiki}-{date}-cirrussearch-content.json'

(
 db
 .read_text(source, blocksize=blocksize)
 .map(json.loads)
 .filter(tz.flip(op.contains, 'title'))
 .to_dataframe()
 .set_index('title', npartitions=npartitions)
 .to_parquet(f'{lang}{wiki}-{date}-cirrussearch.pq', **parquetopts)
)

第一个问题是,使用默认调度程序时,它仅使用一个核心。通过显式使用分布式或多处理调度程序可以避免该问题。

我尝试过的所有调度程序和设置的更大问题是内存使用。看来 dask 在索引时尝试将整个数据帧加载到内存中。即使 450G 的 RAM 也不够。

  • 如何减少此任务的内存使用量?
  • 如何在不反复试验的情况下估算所需的最小内存?
  • 有更好的方法吗?

最佳答案

为什么 Dask 仅使用一个核心?

这部分的 JSON 解析可能是 GIL 绑定(bind)的,你想使用进程。然而,当您最终计算某些内容时,您将使用数据帧,通常假设计算会释放 GIL(这在 Pandas 中很常见),因此它默认使用线程后端。如果您主要受 GIL 解析阶段的约束,那么您可能想要使用多处理调度程序。这应该可以解决您的问题:

dask.config.set(scheduler='multiprocessing')

如何避免在 set_index 阶段使用内存

是的,set_index 计算需要完整的数据集。这是一个难题。如果您正在使用单机调度程序(您似乎正在这样做),那么它应该使用核外数据结构来执行此排序过程。我很惊讶它的内存不足。

如何在不反复试验的情况下估算所需的最小内存?

不幸的是,在任何语言中都很难估计内存中类似 JSON 的数据的大小。使用平面模式,这要容易得多。

有更好的方法吗?

这并不能解决您的核心问题,但您可以考虑在尝试对所有内容进行排序之前以 Parquet 格式暂存数据。然后尝试单独执行 dd.read_parquet(...).set_index(...).to_parquet(...) 。这可能有助于隔离一些成本。

关于parquet - 在单个多核机器上索引大型 dask 数据帧时的内存使用情况,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51094865/

相关文章:

java - 带有 Parquet 文件的 Hive 中的内存问题

hadoop - Spark Avro 到 Parquet Writer

apache-spark - 如何使用 Parquet 在spark中读取和写入同一文件?

python - 使用 dask 迭代填充 xarray 中的 NaN 值

python - 如何在不使用 Pandas 的情况下将 Numpy 转换为 Parquet?

python - 使用 dask 进行 3D 体处理

python - Dask groupby apply 行为异常

dask - 在群集上运行的 Dask 程序中找不到文件错误

python - 将 numpy 解决方案转换为 dask(numpy 索引在 dask 中不起作用)

python - 本地使用 dask : to Client() or not to Client()?