python - joblib 的中间结果

标签 python parallel-processing multiprocessing python-multiprocessing joblib

我正在努力学习 joblib模块作为 python 中内置 multiprocessing 模块的替代品。我习惯于使用 multiprocessing.imap 在可迭代对象上运行一个函数并返回结果。在这个最小的工作示例中,我无法弄清楚如何使用 joblib :

import joblib, time

def hello(n):
    time.sleep(1)
    print "Inside function", n
    return n

with joblib.Parallel(n_jobs=1) as MP:

    func = joblib.delayed(hello)
    for x in MP(func(x) for x in range(3)):
        print "Outside function", x

打印:

Inside function 0
Inside function 1
Inside function 2
Outside function 0
Outside function 1
Outside function 2

我想看看输出:

Inside function 0
Outside function 0
Inside function 1
Outside function 1
Inside function 2
Outside function 2

或类似的东西,表明可迭代的 MP(...) 没有等待所有结果完成。对于更长的演示更改 n_jobs=-1range(100)

最佳答案

stovfl 的回答很优雅,但它只适用于第一批派出的。在示例中,它之所以有效,是因为 worker 们从不挨饿 (n_tasks < 2*n_jobs)。为了使这种方法起作用,回调最初传递给 apply_async也必须被调用。这是 BatchCompletionCallBack 的一个实例,它安排下一批要处理的任务。

一个可能的解决方案是将任意回调包装在一个可调用对象中,像这样(在 joblib==0.11, py36 中测试):

from joblib._parallel_backends import MultiprocessingBackend
from joblib import register_parallel_backend, parallel_backend
from joblib import Parallel, delayed
import time

class MultiCallback:
    def __init__(self, *callbacks):
        self.callbacks = [cb for cb in callbacks if cb]

    def __call__(self, out):
        for cb in self.callbacks:
            cb(out)

class ImmediateResultBackend(MultiprocessingBackend):
    def callback(self, result):
        print("\tImmediateResult function %s" % result)

    def apply_async(self, func, callback=None):
        cbs = MultiCallback(callback, self.callback)
        return super().apply_async(func, cbs)

register_parallel_backend('custom', ImmediateResultBackend)

def hello(n):
    time.sleep(1)
    print("Inside function", n)
    return n

with parallel_backend('custom'):
    res = Parallel(n_jobs=2)(delayed(hello)(y) for y in range(6))

输出

Inside function 0
Inside function 1
    ImmediateResult function [0]
    ImmediateResult function [1]
Inside function 3
Inside function 2
    ImmediateResult function [3]
    ImmediateResult function [2]
Inside function 4
    ImmediateResult function [4]
Inside function 5
    ImmediateResult function [5]

关于python - joblib 的中间结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38483874/

相关文章:

Python SQL 查询 - 未知列

android - 通过 Python 执行 Android 命令并将结果存储在列表中

multithreading - 在单核系统中如何处理抢占?

performance - Haskell 并行运行速度较慢

python - 函数内部多处理

python - Flask App Builder 在数组上抛出 KeyError

scala - 模拟 Scala ParStream

python - 是否可以通过 C++ 扩展强制多个 python 进程共享相同的内存?

python - 多处理远程服务器和套接字错误

python - 我的腌制 cookies 在 selenium - python 中不起作用?