python - Dask:跨嵌套列表并行化

标签 python parallel-processing dask

我在跨嵌套列表列表并行化时遇到问题。

我的问题的简化版本如下所示:

我有一个列表列表:

[[a, b], [c, d, e], [f, g, h, i], [j], [k, l], ...]  # let's call this "L"

在 Dask 中,我使用一个函数来创建它们,因为创建每个子列表需要很长时间,而我大约有几百个。

def parse(filename):
    return list_of_lists

L = []
for fname in filenames:
    L.append(client.submit(parse, fname))

不过,这里的关键是我想将子列表视为单个扩展元素列表(即 [a, b, c,... l, ...] ) 并在每个函数上并行化另一个长时间运行的函数,但不收集我的元素并展平列表。这是因为执行以下操作也需要很长时间:

from itertools import chain
L = client.gather(L)  # the gather step takes time
L = list(chain.from_iterable(L))  # flattening is fast
L_future = client.scatter(L)  # the scatter step takes time too

def func2(element):
    """Also takes a long time."""
    ...
    return result

results = client.map(func2, L_future)

有没有办法以更优化的方式实现这一目标?

最佳答案

是的,有多种方法可以做到这一点。

这里有一个例子:https://examples.dask.org/applications/evolving-workflows.html

关于python - Dask:跨嵌套列表并行化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58209596/

相关文章:

python - 有条件生成新列-Pandas

ruby - 在 Ruby 中线程化 sqlite 连接

python - 带有大文件的 Amazon s3 上的 dask read_csv 超时

python - 在Python中从csv读取元组

python - QWebEngineView:显示加载进度

c++ - 优化矩阵旋转 - 关于矩阵中心的任意角度

python - 将 dask 数据帧保存到 csv 时如何纠正错误?

python - 使用 GUI 从 Python 代码运行 dask 调度程序

python - 如何在python中安装gensim并运行包?

go - 保存并行化 goroutine 的结果