tl;博士
我想要
dd.read_parquet('*.parq')['column'].nunique().compute()
但我明白了
WARNING - Worker exceeded 95% memory budget. Restarting
在 worker 全部被杀之前有几次。
长版
我有一个数据集
- 100 亿行,
- ~20 列,
以及一台内存约为 200GB 的机器。我正在尝试使用 dask 的 LocalCluster 来处理数据,但即使我使用相当小的子集并尝试使用基本操作,我的工作人员很快就会超出其内存预算并被杀死。
我重新创建了一个玩具问题来演示以下问题。
合成数据
为了在较小的范围内近似解决上述问题,我将创建一个包含 32 个字符 id 的单列
- 一百万个唯一 ID
- 总长度2亿行
- 分割成 100 个
parquet
文件
结果将是
- 100 个文件,每个
66MB
,作为 Pandas 数据帧加载时占用178MB
(通过df.memory_usage(deep=True).sum() 估计)
) - 如果作为 pandas 数据帧加载,则所有数据在内存中占用
20GB
- 一个包含所有 ID 的系列(我认为工作人员在计算
nunique
时也必须保留在内存中)大约需要90MB
import string
import os
import numpy as np
import pandas as pd
chars = string.ascii_letters + string.digits
n_total = int(2e8)
n_unique = int(1e6)
# Create random ids
ids = np.sum(np.random.choice(np.array(list(chars)).astype(object), size=[n_unique, 32]), axis=1)
outputdir = os.path.join('/tmp', 'testdata')
os.makedirs(outputdir, exist_ok=True)
# Sample from the ids to create 100 parquet files
for i in range(100):
df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')
尝试解决方案
假设我的机器只有 8GB
内存。根据 Wes Kinney 的 rule of thumb 的说法,由于分区大约需要 178MB
且结果 90MB
,我可能需要高达 2-3Gb 的内存。因此,要么
n_workers=2、memory_limit='4GB'
或n_workers_1,memroy_limit='8GB'
似乎是一个不错的选择。可悲的是,当我尝试时,我得到了
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
几次,然后 worker 就被彻底杀死了。
import os
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
cluster = LocalCluster(n_workers=4, memory_limit='6GB')
client = Client(cluster)
dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'))['id'].nunique().compute()
事实上,例如,对于 4
个工作人员,他们每个人都需要 6GB
内存才能执行任务。
这种情况可以改善吗?
最佳答案
这是反复出现的问题的一个很好的例子。唯一让人震惊的是delayed
在合成数据创建期间未使用:
import dask
@dask.delayed
def create_sample(i):
df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')
return
# Sample from the ids to create 100 parquet files
dels = [create_sample(i) for i in range(100)]
_ = dask.compute(dels)
对于以下答案,我实际上只会使用少量分区(因此更改为 range(5)
),以获得合理的可视化效果。让我们从加载开始:
df = dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'), use_cols=['id'])
print(df.npartitions) # 5
这是一个小问题,但有 use_cols=['id']
在.read_parquet()
,利用了列式提取的 parquet 优势(dask 可能会在幕后进行一些优化,但如果您知道所需的列,那么明确说明没有什么坏处)。
现在,当您运行 df['id'].nunique()
时,这是 dask 将计算的 DAG:
分区越多,步骤就会越多,但很明显,当每个分区尝试发送相当大的数据时,存在潜在的瓶颈。对于高维列来说,该数据可能非常大,因此如果每个工作人员尝试发送需要 100MB 对象的结果,那么接收工作人员将必须拥有 5 倍的内存来接受数据(这可能会减少进一步计算值后)。
额外考虑的是单个工作人员在给定时间可以运行多少个任务。控制给定工作线程上可以同时运行多少个任务的最简单方法是 resources 。如果您使用 resources
启动集群:
cluster = LocalCluster(n_workers=2, memory_limit='4GB', resources={'foo': 1})
然后每个工作线程都有指定的资源(在本例中是 1 个任意单位 foo
),因此如果您认为处理单个分区应该一次发生一个(由于内存占用较高),那么您可以做:
# note, no split_every is needed in this case since we're just
# passing a single number
df['id'].nunique().compute(resources={'foo': 1})
这将确保任何单个工作人员一次忙于一项任务,从而防止内存使用过多。 (旁注:还有 .nunique_approx()
,您可能会感兴趣)
要控制任何给定工作线程接收以进行进一步处理的数据量,一种方法是使用 split_every
选项。以下是 split_every=3
的 DAG 的样子:
您可以看到,现在(对于此数量的分区),工作线程所需的最大内存是数据集最大大小的 3 倍。因此,根据您的工作内存设置,您可能需要设置 split_every
到一个较低的值(2、3、4 左右)。
一般来说,变量越独特,每个分区的具有唯一计数的对象需要的内存就越多,因此 split_every
的值越低。对于限制最大内存使用量将很有用。如果变量不是非常唯一,那么每个单独分区的唯一计数将是一个小对象,因此不需要 split_every
限制。
关于python - Dask Dataframe 独特操作 : Worker running out of memory (MRE),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66696579/