python - 如何在多处理中使用队列设置管道

标签 python multiprocessing

这是我的代码,它应该做一些与 this other question 非常相似的事情正在尝试做,特别是这张图是相关的:Process Diagram 其中 f1 = 生产,f2 = f3 = worker ,f4 = 消费者。

我还没有尝试解决完美结束一切的问题,这不是这个问题的主题。我收到错误“RuntimeError:队列对象只能通过继承在进程之间共享” 而且我不确定该怎么做才能解决它。我只想将队列传递给 Go 的 channel 之类的函数,真的。这是代码。

import multiprocessing


def produce(n, queue):
    for i in xrange(n):
        queue.put(i)

def worker(in_queue, out_queue):
    for i in iter( in_queue.get, None):
        out_queue.put(i*i)

def consumer(queue):
    ans = []
    for i in iter( queue.get, None):
        ans.append(i)
    return ans


def main(n):
    pool = multiprocessing.Pool(4)
    in_queue = multiprocessing.Queue()
    out_queue = multiprocessing.Queue()

    pool.apply_async(produce, (n, in_queue))
    for i in range(2):
        pool.apply_async(worker, (in_queue, out_queue))
    result = consumer(out_queue)
    pool.close()
    pool.join()
    return result

main(200)

我该如何修复它?

有更简单的方法吗?

我已经尝试过 Pool.map,但我想让它正常工作。

最佳答案

multiprocessing.Pool 已经设置了必要的 IPC 机制,允许您在它启动后将作业提交给它的工作人员,但是您不能稍后将队列或类似的东西作为争论。这就是为什么您的代码不起作用的原因。在启动子进程时,它必须知道如何与其父进程通信。

所以如果你需要设置自己的队列,你应该直接使用multiprocessing.Process。此外,您正在写的是典型的 worker ,他们在循环中等待新工作并处理它们。在 worker 池中运行这样的 worker 不是你想做的事情。

这样你的代码就可以工作了:

import multiprocessing


def produce(n, queue):
    for i in xrange(n):
        queue.put(i)

def worker(in_queue, out_queue):
    for i in iter( in_queue.get, None):
        out_queue.put(i*i)

def consumer(queue):
    ans = []
    for i in iter( queue.get, None):
        print(i)
        ans.append(i)
    return ans


def main(n):
    in_queue = multiprocessing.Queue()
    out_queue = multiprocessing.Queue()
    producer = multiprocessing.Process(target=produce, args=(n, in_queue))
    for i in range(2):
        w = multiprocessing.Process(target=worker, args=(in_queue, out_queue))
        w.start()
    producer.start()
    res = consumer(out_queue)

main(200)

我在您的consumer 中添加了一条打印语句,以表明正在发生某些事情。 consumer 函数永远不会终止,因为您从队列中读取的代码会等待终止 None 而它永远不会出现,因为工作人员和生产者都不会将其放入队列中。 ..

关于python - 如何在多处理中使用队列设置管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17305173/

相关文章:

python - Try/except 异常类型的特定错误

python-3.x - 如何最好地使用 Python 并行化 grakn 查询?

python - 使用导入模块进行多处理

python - 在IPython中运行ProcessPoolExecutor

php - 如何从 PHP 编译 Python?

python - root用户执行失败

python - pip 安装时出现 Azure DevOps python 提要错误

python - 如何更改进度条的位置 - 多重处理

Python 多处理池与多处理线程池

python - 类型转换原始字符串python