python - 多处理池map_async的意外行为

标签 python python-3.x multiprocessing python-multiprocessing

我有一些代码对 python 3 应用程序中的多个文件执行相同的操作,因此似乎是 multiprocessing 的绝佳候选者。 。我正在尝试使用 Pool将工作分配给一定数量的进程。我希望代码在进行这些计算时继续执行其他操作(主要是为用户显示内容),因此我想使用 map_async multiprocessing.Pool的功能为此类。我希望在调用此函数后,代码将继续,结果将由我指定的回调处理,但这似乎没有发生。以下代码显示了我尝试调用 map_async 的三种方法以及我看到的结果:

import multiprocessing
NUM_PROCS = 4
def func(arg_list):
    arg1 = arg_list[0]
    arg2 = arg_list[1]
    print('start func')
    print ('arg1 = {0}'.format(arg1))
    print ('arg2 = {0}'.format(arg2))
    time.sleep(1)
    result1 = arg1 * arg2
    print('end func')
    return result1

def callback(result):
    print('result is {0}'.format(result))


def error_handler(error1):
    print('error in call\n {0}'.format(error1))


def async1(arg_list1):
    # This is how my understanding of map_async suggests i should
    # call it. When I execute this, the target function func() is not called
    with multiprocessing.Pool(NUM_PROCS) as p1:
        r1 = p1.map_async(func,
                          arg_list1,
                          callback=callback,
                          error_callback=error_handler)


def async2(arg_list1):
    with multiprocessing.Pool(NUM_PROCS) as p1:
        # If I call the wait function on the result for a small
        # amount of time, then the target function func() is called
        # and executes sucessfully in 2 processes, but the callback
        # function is never called so the results are not processed
        r1 = p1.map_async(func,
                          arg_list1,
                          callback=callback,
                          error_callback=error_handler)
        r1.wait(0.1)


def async3(arg_list1):
    # if I explicitly call join on the pool, then the target function func()
    # successfully executes in 2 processes and the callback function is also
    # called, but by calling join the processing is not asynchronous any more
    # as join blocks the main process until the other processes are finished.
    with multiprocessing.Pool(NUM_PROCS) as p1:
        r1 = p1.map_async(func,
                          arg_list1,
                          callback=callback,
                          error_callback=error_handler)
        p1.close()
        p1.join()


def main():
    arg_list1 = [(5, 3), (7, 4), (-8, 10), (4, 12)]
    async3(arg_list1)

    print('pool executed successfully')


if __name__ == '__main__':
    main()

何时 async1 , async2async3在 main 中调用,结果在每个函数的注释中描述。任何人都可以解释为什么不同的调用会有这样的行为吗?最后我想调用map_asyncasync1 中所做的那样,这样我就可以在工作进程繁忙时在主进程的其他部分做一些事情。我已经在较旧的 RH6 Linux 机器和较新的 ubuntu 虚拟机上使用 python 2.7 和 3.6 测试了此代码,得到了相同的结果。

最佳答案

发生这种情况是因为当您使用multiprocessing.Pool作为上下文管理器时,pool.terminate() is called when you leave the with block ,它会立即退出所有工作人员,而无需等待正在进行的任务完成。

New in version 3.3: Pool objects now support the context management protocol – see Context Manager Types. __enter__() returns the pool object, and __exit__() calls terminate().

IMO 使用 terminate() 作为上下文管理器的 __exit__ 方法并不是一个很好的设计选择,因为似乎大多数人直观地期望 close( ) 将被调用,它将等待正在进行的任务完成后再退出。不幸的是,您所能做的就是重构您的代码,不使用上下文管理器,或者重构您的代码,以便保证您不会离开 with block ,直到 Pool 被使用。完成其工作。

关于python - 多处理池map_async的意外行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48870608/

相关文章:

python - 如何在 Python 中将二进制数组写为图像?

python - 我怎么知道所有的 Futures 都在 Tornado 中解决了?

python - 如何在不在 NumPy 中复制的情况下展平多维数组的轴?

python - 如何使用Python脚本启动和停止包含 "http.server.serveforever"的Python脚本

python-3.x - 在 ANN 模型中加载 pickle 时接收错误

python - 如何从文件加载多个正则表达式模式并匹配给定的字符串?

python - 如何获取 subprocess.run 启动的进程的 pid 并杀死它

python multiprocessing无法控制多个长时间运行的控制台exe?

python - 运行超时子进程和管道结果后挂起父进程

python - 如何在Python中将变量与多个进程同步?