Python multiprocessing.Process 行为不确定

标签 python python-3.x multiprocessing python-multiprocessing

以下代码显示了一个简单的 multiprocessing.Process 管道,其中包含一个共享的列表字典和一个用于不同消费者的任务队列:

import multiprocessing

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_dict):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_dict = result_dict

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print('%s: Exiting' % proc_name)
                self.task_queue.task_done()
                break
            print('%s: %s' % (proc_name, next_task))

            # Do something with the next_task
            l = self.result_dict[5]
            l.append(3)
            self.result_dict[5] = l
            # alternative, but same problem
            #self.result_dict[5] += [3]

            self.task_queue.task_done()
        return

def provide_tasks(tasks, num_worker):
    low = [ 
        ['w1', 'w2'],
        ['w3'],
        ['w4', 'w5']
    ]
    for el in low:
        tasks.put(el)
    # Add a poison pill for each worker
    for i in range(num_worker):
        tasks.put(None)

if __name__ == '__main__':
    num_worker = 3
    tasks = multiprocessing.JoinableQueue()
    manager = multiprocessing.Manager()

    results = manager.dict()
    lists = [manager.list() for i in range(1, 11)]
    for i in range(1, 11):
        results[i] = lists[i - 1]

    worker = [Consumer(tasks, results) for i in range(num_worker)]
    for w in worker:
        w.start()

    p = multiprocessing.Process(target=provide_tasks, args=(tasks,num_worker))
    p.start()
    # Wait for all of the tasks to finish
    p.join()
    print(results)

当您使用 Python3.x 运行此示例时,您将收到结果字典的不同输出。我实际上希望结果字典看起来像

{1: [], 2: [], 3: [], 4: [], 5: [3, 3, 3], 6: [], 7: [], 8: [], 9: [], 10: []}

但对于某些执行,它看起来像这样:

{1: [], 2: [], 3: [], 4: [], 5: [3, 3], 6: [], 7: [], 8: [], 9: [], 10: []}

有人可以向我解释这种行为吗?为什么少了一个数字?

根据建议的答案更新了解决方法:

if next_task is None:
    with lock:
        self.result_dict.update(self.local_dict)
        [...]

其中 lock 是一个 manager.Lock() 而 self.local_dict 是一个 defaultdict(list)

根据答案评论移动了锁定。还添加了一个不适用于锁的版本。

# Works
with lock:
    l = self.result_dict[x]
    l.append(3)
    self.result_dict[x] = l
self.task_queue.task_done()

# Doesn't work. Even if I move the lock out of the loop. 
for x in range(1, 10):
    with lock:
        l = self.result_dict[x]
        l.append(3)
        self.result_dict[x] = l

为了让第二个例子正常工作,我们还需要对所有 worker 调用 join

最佳答案

获取列表的本地副本、修改它并将其重新分配给管理器字典不是原子操作,因此会造成追加操作可能“丢失”的竞争条件。

描述于 this python bug report .

l = self.result_dict[5]  # <-- race begins
l.append(3)
self.result_dict[5] = l  # <-- race ends

关于Python multiprocessing.Process 行为不确定,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36161010/

相关文章:

python - Unicode解码错误: 'ascii' codec can't decode byte 0xe2 in position 4: ordinal not in range(128)

python-3.x - 如何求二叉树中同一层的两个节点之间的水平距离?

python - 如何忽略存在空白值的行 Pandas Python

python - Celery worker 变量共享问题

python - 多处理池管理器命名空间 EOF 错误

python - Windows 上的多处理无限循环(Python)

python - django rest framework 在列表显示中隐藏特定字段?

具有命名字段访问的 python 张量

python - 我将如何循环这段代码以便它再次开始?

使用 -m 开关运行时,Python3 子模块安装程序不会更新路径