python - 如何将 Future 与来自 d​​ask.distributed(Python 库)的 Executor 的 map 方法一起使用?

标签 python python-2.7 distributed dask

我正在运行 dask.distributed簇。

我的任务包括链式计算,其中最后一步是使用 Executor.map 方法对在前面的步骤中创建的列表进行并行处理。列表的长度是事先不知道的,因为它是在计算过程中从中间结果生成的。

代码如下所示:

from distributed import Executor, progress


def process():
    e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
                                           port=config('SERVER_PORT')))
    futures = []
    gen_list1 = get_list_1()
    gen_f1 = e.map(generate_1, gen_list1)
    futures.append(gen_f1)

    gen_list2 = get_list_2()
    gen_f2 = e.map(generate_2, gen_list2)
    futures.append(gen_f2)

    m_list = e.submit(create_m_list)  # m_list is created from gen_list1 and gen_list2
                                      # some results of processing are stored in the database
                                      # and create_m_list doesn't need additional arguments
    futures.append(m_list)

    m_result = e.map(process_m_list, m_list)
    futures.append(m_result)

    return futures

if __name__ == '__main__':
    r = process()
    progress(r)

但是,我收到错误TypeError: zip argument #1 must support iteration:

File "F:/wl/under_development/database/jobs.py", line 366, in start_job
  match_result = e.map(process_m_list, m_list)
File "C:\Anaconda\lib\site-packages\distributed\executor.py", line 672, in map
  iterables = list(zip(*zip(*iterables)))
TypeError: zip argument #1 must support iteration

gen_list1gen_list2 是独立计算的,但是 m_list 是从 gen_list1gen_list2< 创建的 因此取决于它们。

我也试过调用 m_list.result() 方法,但是,它阻塞了函数 process 直到 的计算code>gen_list1gen_list2 已经完成。

我也试过调用 m_list 的异步方法 ._result,但它产生了同样的错误“zip argument #1 must support iteration”。 dask.delayed (m_result = e.map(process_m_list, delayed(m_list))) 出现了同样的错误。

dask.distributed 的文档在这方面含糊不清,示例仅提及已经存在的真实列表对象。但是,SO 中的其他帖子以及 Google 都建议这应该是可能的。

这是我的 Python 发行版的版本字符串

Python 2.7.11 |Anaconda 自定义(64 位)| (默认,2016 年 2 月 16 日,09:58:36)Win32 上的 [MSC v.1500 64 位 (AMD64)]

最佳答案

问题的症结似乎在这里:

m_list = e.submit(create_m_list)
m_result = e.map(process_m_list, m_list)

你是正确的,你不能将一个函数映射到一个单独的 future 。您需要向 map 传递一个序列。 Dask 在不了解您的数据的情况下不知道要提交多少功能。在未来调用 .result() 将是一个很好的解决方案:

m_list = e.submit(create_m_list)
m_result = e.map(process_m_list, m_list.result())

I've also tried calling .result() method of m_list, however, it has blocked the function process until computations of gen_list1 and gen_list2 have finished.

没错。如果没有任何附加信息,调度程序将更喜欢较早提交的计算。您可以通过先提交 create_m_list 函数,然后提交您的额外计算,然后等待 create_m_list 结果来解决此问题。

m_list = e.submit(create_m_list)                   # give this highest priority
f1 = e.map(generate_1, get_list_1())
f2 = e.map(generate_2, gen_list_2())

L = m_list.result()                                # block on m_list until done
m_result = e.map(process_m_list, L)                # submit more tasks

return [f1, f2, m_result]

关于python - 如何将 Future 与来自 d​​ask.distributed(Python 库)的 Executor 的 map 方法一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39127531/

相关文章:

python - 如何处理南方(django)的重构?

python - pygame mouse.get_pos() 不工作

apache - 我的 WSGI 应用程序无法作为 Python 模块加载。我究竟做错了什么

python - 信号处理鼠兔/ python

python - 词频与词典理解

Python 2.7 : Passing class methods to another function

python - 从 big5 解码/编码为 utf-8 不起作用

testing - 具有机器人框架的多个远程库

.net - 如何测试 1000 个客户端 Windows 7 客户端

agile - 分布式开发团队——需要的工具