python - 避免 Python 3 的多处理队列中的竞争条件

标签 python python-3.x parallel-processing race-condition

我正在尝试找出大约 61 亿(自定义)项目的最大重量,我想通过并行处理来完成这项工作。对于我的特定应用程序,有更好的算法不需要我迭代超过 61 亿个项目,但解释它们的教科书让我头疼,我的老板希望在 4 天内完成。我想我可以更好地利用我公司的高级服务器和并行处理。然而,我对并行处理的所有了解都来自于阅读 Python documentation .也就是说我很迷路......

我目前的理论是设置一个供给进程、一个输入队列、一大堆(比如 30 个)工作进程和一个输出队列(找到输出队列中的最大元素将是微不足道的)。我不明白的是供给进程如何告诉工作进程何时停止等待项目通过输入队列。

我曾考虑过在我的 6.1E9 项目的可迭代对象上使用 multiprocessing.Pool.map_async,但在不对项目执行任何操作的情况下迭代这些项目需要将近 10 分钟。 除非我误解了什么......,让 map_async 遍历它们以将它们分配给进程可以在进程开始工作时完成。 (Pool 也提供了 imapdocumentation 说它类似于 map,似乎不是异步工作。我想要异步,对吗?)

相关问题:我想使用 concurrent.futures 而不是 multiprocessing 吗?我不可能是第一个实现双队列系统的人(这正是美国每家熟食店排队的方式......)那么是否有更 Pythonic/内置的方式来做到这一点?

这是我正在尝试做的事情的框架。 请参阅中间的评论 block 。

import multiprocessing as mp
import queue

def faucet(items, bathtub):
    """Fill bathtub, a process-safe queue, with 6.1e9 items"""
    for item in items:
        bathtub.put(item)
    bathtub.close()

def drain_filter(bathtub, drain):
    """Put maximal item from bathtub into drain.
    Bathtub and drain are process-safe queues.
    """
    max_weight = 0
    max_item = None
    while True:
        try:
            current_item = bathtub.get()
        # The following line three lines are the ones that I can't
        # quite figure out how to trigger without a race condition.
        # What I would love is to trigger them AFTER faucet calls
        # bathtub.close and the bathtub queue is empty.
        except queue.Empty:
            drain.put((max_weight, max_item))
            return
        else:
            bathtub.task_done()
        if not item.is_relevant():
            continue
        current_weight = item.weight
        if current_weight > max_weight:
            max_weight = current_weight
            max_item = current_item

def parallel_max(items, nprocs=30):
    """The elements of items should have a method `is_relevant`
    and an attribute `weight`. `items` itself is an immutable
    iterator object.
    """
    bathtub_q = mp.JoinableQueue()
    drain_q = mp.Queue()

    faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
    worker_procs = mp.Pool(processes=nprocs)

    faucet_proc.start()
    worker_procs.apply_async(drain_filter, bathtub_q, drain_q)

    finalists = []
    for i in range(nprocs):
        finalists.append(drain_q.get())

    return max(finalists)


答案在这里

我从 Python 基金会传播总监 Doug Hellman 那里找到了对我的问题的非常详尽的答案,以及对多任务处理的温和介绍。我想要的是“毒丸”模式。在这里查看:http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html

支持@MRAB 发布该概念的内核。

最佳答案

您可以将一个特殊的终止项(例如 None)放入队列中。当一个 worker 看到它时,它可以把它放回去让其他 worker 看到,然后终止。或者,您可以将每个工作人员的一个特殊终止项放入队列中。

关于python - 避免 Python 3 的多处理队列中的竞争条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10607747/

相关文章:

python - 使用 Python 创建可直接执行的跨平台 GUI 应用程序

python - 可以在 Python 的绘图命令中添加条件语句吗?

python - 为什么运行 py 和 kv 脚本时出现空白屏幕?

c# - 在 Parallel.For 中使用索引器的正确方法

r - 如何并行化代码以对 R 中的列表进行排序和求和?

python - 如何在 python 中将长二进制字符串转换为整数? (基础 > 36)

python - 机器学习算法中的示例顺序 (Scikit Learn)

c++ - OpenMP 是否复制私有(private)对象?

python - 如何将列表中元组的第 i 个元素映射到另一个列表中的键以形成字典

python-3.x - 使用 POST 将文件上传到 Flask 应用程序时出现 308 重定向