使用队列列表的 Python 多处理

标签 python multithreading list serialization queue

我尝试使用 Python 2.7 多处理包创建队列列表。每个子进程拥有一个单独的队列并有两个任务:从自己的队列中获取元素并将元素放入其他子进程的队列中。因此,每个子进程都必须知道哪个队列属于它,这就是我使用队列列表的原因。

我做了如下代码:

mgr = multiprocessing.Manager()

sharedQueueList = mgr.list()

for i in xrange(num_processes):
  sharedQueueList .append(mgr.Queue())

但是,我收到以下错误消息:

**raise convert_to_error(kind, result)**
RemoteError: 
---------------------------------------------------------------------------
Unserializable message: ('#RETURN', < Queue.Queue instance at 0x02AD3170 >)
---------------------------------------------------------------------------

最佳答案

在父级中创建 Queue 列表,在创建时将一些交给每个 worker。每个 worker 将从它的一个队列中取出作业,输出到另一个队列。

import logging, multiprocessing

def myproc(arg):
    return arg*2

def worker(qlist):
    logger = multiprocessing.get_logger()
    logger.info('start')
    while True:
        job = qlist[0].get()
        logger.info('got %s', job)
        if job is None:
            logger.info('exiting')
            return
        qlist[1].put( myproc(job) )

logger = multiprocessing.log_to_stderr(
    level=logging.INFO,
)
logger.info('setup')

numProcs = 3
queueList = [ multiprocessing.Queue() for i in xrange(numProcs) ]

# prefill with 3 jobs
for num in range(3):
    queueList[0].put(num)
# signal end of jobs
queueList[0].put(None)

worker_p = multiprocessing.Process(
    target=worker, args=( [queueList[0], queueList[1]], ),
    name='worker',
)
worker_p.start()

worker_p.join()

logger.info('done')

运行示例:

[INFO/MainProcess] setup
[INFO/worker] child process calling self.run()
[INFO/worker] start
[INFO/worker] got 0
[INFO/worker] got 1
[INFO/worker] got 2
[INFO/worker] got None
[INFO/worker] exiting
[INFO/worker] process shutting down
[INFO/worker] process exiting with exitcode 0
[INFO/MainProcess] done
[INFO/MainProcess] process shutting down

关于使用队列列表的 Python 多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18390306/

相关文章:

c++ - 为什么我不能推回列表 C++

python - 如何在 python 中生成列表之间所有可能的组合?

java - 将字符串中的正则表达式模式替换为取决于匹配的替换字符串

c++ - 在 boost 中禁用中断会禁用上下文切换吗?

python - 在线程中打印不适用于python 3

python - 获取句子中字母的频率

python - 嵌套 for 循环的更多 Pythonic 方式

python - 如何使用python docx将表格边框添加到word doc

python - 为什么 1 行 DataFrame 上的 collect() 使用 2000 个执行器?

java - Simpledate 的线程安全代码