python 或 dask 并行生成器?

标签 python pandas python-multiprocessing dask

是否有可能在Python中(也许使用dask,也许使用多处理)将生成器“放置”在核心上,然后并行地单步执行生成器并处理结果?

它特别需要是生成器(或带有 __iter__ 的对象);生成器生成的所有生成元素的列表无法装入内存。

特别是:

使用 pandas,我可以调用 read_csv(...iterator=True),它为我提供了一个迭代器 (TextFileReader) - 我可以 for in 它或显式调用接下来多次。整个 csv 永远不会被读入内存。不错。

每次我从迭代器读取下一个 block 时,我也会对其执行一些昂贵的计算。

但现在我有 2 个这样的文件。我想创建 2 个这样的生成器,并将 1 个“放置”在一个核心上,将 1 个放置在另一个核心上,这样我就可以:

 result = expensive_process(next(iterator))

在每个核心上并行,然后组合并返回结果。重复此步骤,直到一台发电机或两台发电机都超出产量。

看起来 TextFileReader 不可 pickle,生成器也不可。我不知道如何在 dask 或多处理中执行此操作。有这方面的模式吗?

最佳答案

Dask 的 read_csv 旨在以 block 的形式从多个文件加载数据, block 大小可以指定。当您对生成的数据帧进行操作时,您将按 block 进行工作,这正是使用 Dask 的首要目的。应该不需要使用迭代器方法。

您最有可能想要使用的 dask 数据帧方法是 map_partitions()

如果您真的想使用迭代器的想法,您应该研究dask.delayed,它能够通过发送每次调用来并行化任意Python函数。函数(每个函数都有不同的文件名)给您的工作人员。

关于python 或 dask 并行生成器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53329891/

相关文章:

python - 为什么这个方法不返回值?

python - Pandas 数据框 : how to permute rows and create new groups of combinations

python - 数据打印不正确

python-3.x - Python多处理池: How to get all processes running currently

python - 在 Django 中创建自定义命令

python - 如何为Python 3.1 ubuntu服务器10.04安装请求​​模块?

python - 安装 Jupyter Notebook 后,NumPy 和 TensorFlow 无法正常工作

python Pandas : How to choose a certain option within duplicates

python - 选定行和列的 Pandas min()

python - Python 多处理队列是否有序?