python - Dask Dataframe 独特操作 : Worker running out of memory (MRE)

标签 python dataframe memory dask dask-dataframe

tl;博士

我想要

dd.read_parquet('*.parq')['column'].nunique().compute()

但我明白了

WARNING - Worker exceeded 95% memory budget. Restarting

在 worker 全部被杀之前有几次。


长版

我有一个数据集

  • 100 亿行,
  • ~20 列,

以及一台内存约为 200GB 的机器。我正在尝试使用 dask 的 LocalCluster 来处理数据,但即使我使用相当小的子集并尝试使用基本操作,我的工作人员很快就会超出其内存预算并被杀死。

我重新创建了一个玩具问题来演示以下问题。

合成数据

为了在较小的范围内近似解决上述问题,我将创建一个包含 32 个字符 id 的单列

  • 一百万个唯一 ID
  • 总长度2亿行
  • 分割成 100 个 parquet 文件

结果将是

  • 100 个文件,每个 66MB,作为 Pandas 数据帧加载时占用 178MB(通过 df.memory_usage(deep=True).sum() 估计))
  • 如果作为 pandas 数据帧加载,则所有数据在内存中占用 20GB
  • 一个包含所有 ID 的系列(我认为工作人员在计算 nunique 时也必须保留在内存中)大约需要 90MB
import string
import os

import numpy as np
import pandas as pd

chars = string.ascii_letters + string.digits

n_total = int(2e8)
n_unique = int(1e6)

# Create random ids
ids = np.sum(np.random.choice(np.array(list(chars)).astype(object), size=[n_unique, 32]), axis=1)

outputdir = os.path.join('/tmp', 'testdata')
os.makedirs(outputdir, exist_ok=True)

# Sample from the ids to create 100 parquet files
for i in range(100):
    df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
    df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')

尝试解决方案

假设我的机器只有 8GB 内存。根据 Wes Kinney 的 rule of thumb 的说法,由于分区大约需要 178MB 且结果 90MB ,我可能需要高达 2-3Gb 的内存。因此,要么

  • n_workers=2、memory_limit='4GB'
  • n_workers_1,memroy_limit='8GB'

似乎是一个不错的选择。可悲的是,当我尝试时,我得到了

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker

几次,然后 worker 就被彻底杀死了。

import os
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd

cluster = LocalCluster(n_workers=4, memory_limit='6GB')
client = Client(cluster)

dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'))['id'].nunique().compute()

事实上,例如,对于 4 个工作人员,他们每个人都需要 6GB 内存才能执行任务。

这种情况可以改善吗?

最佳答案

这是反复出现的问题的一个很好的例子。唯一让人震惊的是delayed在合成数据创建期间未使用:

import dask
@dask.delayed
def create_sample(i):
    df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
    df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')    
    return

# Sample from the ids to create 100 parquet files
dels = [create_sample(i) for i in range(100)]
_ = dask.compute(dels)

对于以下答案,我实际上只会使用少量分区(因此更改为 range(5) ),以获得合理的可视化效果。让我们从加载开始:

df = dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'), use_cols=['id'])
print(df.npartitions) # 5

这是一个小问题,但有 use_cols=['id'].read_parquet() ,利用了列式提取的 parquet 优势(dask 可能会在幕后进行一些优化,但如果您知道所需的列,那么明确说明没有什么坏处)。

现在,当您运行 df['id'].nunique() 时,这是 dask 将计算的 DAG:

enter image description here

分区越多,步骤就会越多,但很明显,当每个分区尝试发送相当大的数据时,存在潜在的瓶颈。对于高维列来说,该数据可能非常大,因此如果每个工作人员尝试发送需要 100MB 对象的结果,那么接收工作人员将必须拥有 5 倍的内存来接受数据(这可能会减少进一步计算值后)。

额外考虑的是单个工作人员在给定时间可以运行多少个任务。控制给定工作线程上可以同时运行多少个任务的最简单方法是 resources 。如果您使用 resources 启动集群:

cluster = LocalCluster(n_workers=2, memory_limit='4GB', resources={'foo': 1})

然后每个工作线程都有指定的资源(在本例中是 1 个任意单位 foo ),因此如果您认为处理单个分区应该一次发生一个(由于内存占用较高),那么您可以做:

# note, no split_every is needed in this case since we're just 
# passing a single number
df['id'].nunique().compute(resources={'foo': 1})

这将确保任何单个工作人员一次忙于一项任务,从而防止内存使用过多。 (旁注:还有 .nunique_approx() ,您可能会感兴趣)

要控制任何给定工作线程接收以进行进一步处理的数据量,一种方法是使用 split_every选项。以下是 split_every=3 的 DAG 的样子:

enter image description here

您可以看到,现在(对于此数量的分区),工作线程所需的最大内存是数据集最大大小的 3 倍。因此,根据您的工作内存设置,您可能需要设置 split_every到一个较低的值(2、3、4 左右)。

一般来说,变量越独特,每个分区的具有唯一计数的对象需要的内存就越多,因此 split_every 的值越低。对于限制最大内存使用量将很有用。如果变量不是非常唯一,那么每个单独分区的唯一计数将是一个小对象,因此不需要 split_every限制。

关于python - Dask Dataframe 独特操作 : Worker running out of memory (MRE),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66696579/

相关文章:

python - 对于 Pandas 的 value_counts() 循环(嵌套)

c++ - 新建然后删除后的内存消耗

python - 如何知道服务是否已安装

python - 用数据填充 pandas Panel 对象

python - jinja2(Flask)中是否默认自动转义?

pointers - RAM 中的静态指针是什么,它们如何存在?

c - 变量的内存分配

python - os.chroot 操作不允许

r - 当我在数据框中仅使用列名的初始部分时,为什么 R 不会引发错误?

python - Pandaic 方法检查数据框是否有任何行