以下代码显示了一个简单的 multiprocessing.Process 管道,其中包含一个共享的列表字典和一个用于不同消费者的任务队列:
import multiprocessing
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_dict):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_dict = result_dict
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print('%s: Exiting' % proc_name)
self.task_queue.task_done()
break
print('%s: %s' % (proc_name, next_task))
# Do something with the next_task
l = self.result_dict[5]
l.append(3)
self.result_dict[5] = l
# alternative, but same problem
#self.result_dict[5] += [3]
self.task_queue.task_done()
return
def provide_tasks(tasks, num_worker):
low = [
['w1', 'w2'],
['w3'],
['w4', 'w5']
]
for el in low:
tasks.put(el)
# Add a poison pill for each worker
for i in range(num_worker):
tasks.put(None)
if __name__ == '__main__':
num_worker = 3
tasks = multiprocessing.JoinableQueue()
manager = multiprocessing.Manager()
results = manager.dict()
lists = [manager.list() for i in range(1, 11)]
for i in range(1, 11):
results[i] = lists[i - 1]
worker = [Consumer(tasks, results) for i in range(num_worker)]
for w in worker:
w.start()
p = multiprocessing.Process(target=provide_tasks, args=(tasks,num_worker))
p.start()
# Wait for all of the tasks to finish
p.join()
print(results)
当您使用 Python3.x 运行此示例时,您将收到结果字典的不同输出。我实际上希望结果字典看起来像
{1: [], 2: [], 3: [], 4: [], 5: [3, 3, 3], 6: [], 7: [], 8: [], 9: [], 10: []}
但对于某些执行,它看起来像这样:
{1: [], 2: [], 3: [], 4: [], 5: [3, 3], 6: [], 7: [], 8: [], 9: [], 10: []}
有人可以向我解释这种行为吗?为什么少了一个数字?
根据建议的答案更新了解决方法:
if next_task is None:
with lock:
self.result_dict.update(self.local_dict)
[...]
其中 lock 是一个 manager.Lock()
而 self.local_dict 是一个 defaultdict(list)
。
根据答案评论移动了锁定。还添加了一个不适用于锁的版本。
# Works
with lock:
l = self.result_dict[x]
l.append(3)
self.result_dict[x] = l
self.task_queue.task_done()
# Doesn't work. Even if I move the lock out of the loop.
for x in range(1, 10):
with lock:
l = self.result_dict[x]
l.append(3)
self.result_dict[x] = l
为了让第二个例子正常工作,我们还需要对所有 worker 调用 join
。
最佳答案
获取列表的本地副本、修改它并将其重新分配给管理器字典不是原子操作,因此会造成追加操作可能“丢失”的竞争条件。
描述于 this python bug report .
l = self.result_dict[5] # <-- race begins
l.append(3)
self.result_dict[5] = l # <-- race ends
关于Python multiprocessing.Process 行为不确定,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36161010/