python - 如何顺序聚合dask Bag的内容?

标签 python dask

我想使用非关联的聚合函数顺序聚合分区集合的内容,因此我无法使用 Bag.foldBag.reduction

Bag.accumulate 似乎可以执行此操作,但它返回一个包,其中包含一些每个分区的中间结果,而不仅仅是最终聚合:

>>> import dask.bag as db
>>>
>>> def collect(acc, e):
...     if acc is None:
...         acc = list()
...     acc.append(e)
...     return acc
...
>>> b = db.from_sequence(range(10), npartitions=3)
>>> b.accumulate(collect, initial=None).compute()
[None,
 [0, 1, 2, 3],
 [0, 1, 2, 3],
 [0, 1, 2, 3],
 [0, 1, 2, 3],
 [0, 1, 2, 3, 4, 5, 6, 7],
 [0, 1, 2, 3, 4, 5, 6, 7],
 [0, 1, 2, 3, 4, 5, 6, 7],
 [0, 1, 2, 3, 4, 5, 6, 7],
 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]

基本上,我只对 accumulate 输出的最后一个元素感兴趣,我不想在内存中保留中间步骤的副本。

最佳答案

Bag 目前没有顺序归约操作,但可以。今天实现此目的的一个简单方法是使用上面的 use accumulate ,但只要求最后一个分区的最后一个元素。我们可以通过使用 Bag.to_delayed 将包转换为延迟值来相对轻松地完成此操作。

acc = b.accumulate(collect, initial=None)
partitions = acc.to_delayed()
partitions[-1][-1].compute()

关于python - 如何顺序聚合dask Bag的内容?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45304982/

相关文章:

python - 乳房 X 光检查中最大轮廓的 OpenCV 分割

python - Python 中 with 语句中的 file.close() 异常处理

python - 是否可以使用 dask 获取集合的交集?

python - Dask 数据帧 : Get row count?

python - 如何通过 .py 运行 blob 数据传输

python - Django makemessages 在 locale_paths 中看不到语言环境

python - Paramiko 和 exec_command - 杀死远程进程?

python - 如何在dask debug中关闭python实例

docker - 如何在 docker 镜像上执行完美的 Flow?

python - 你如何转置 dask 数据框(将列转换为行)以接近整洁的数据原则