python多处理共享队列重新排序

标签 python multithreading client-server queue multiprocessing

我有一个服务器和几个客户端。他们都共享一个任务和结果 multiprocessing.Queue。但是,每当客户端完成任务并将结果放入结果队列时,我希望服务器查看结果,并基于此重新排序任务队列。

这当然意味着从任务队列中弹出所有内容并重新添加。在此重新排序过程中,我希望客户端阻止接触任务队列。我的问题是如何让服务器识别何时将任务添加到结果队列,并通过锁定任务队列并在保护队列的同时重新排序来使用react。不变的是,在客户端获得新任务之前,服务器必须在返回每个结果后重新排序。

我想一个简单(但错误)的方法是让 multiprocessing.Value 充当 bool 值,无论何时添加结果,客户端都会将其翻转为 True,这意味着已添加结果。服务器可以通过轮询获取此值,但最终它可能会错过轮询之间出现的另一个客户端并添加另一个结果。

任何想法表示赞赏。

** 'multithreading' 标签只是因为它与线程中的思想非常相似,我认为这里的进程/线程区别并不重要。

最佳答案

让我们尝试一些代码 - 有进展总比没有好 ;-) 部分问题是确保如果结果队列中有内容,则不会从任务队列中获取任何内容,对吗?所以队列是紧密相连的。这种方法将两个队列都置于锁的保护之下,并使用条件来避免任何轮询的需要:

设置,在服务器中完成。 taskQresultQtaskCondresultCond 必须传递给客户端进程(lock 无需显式传递 - 它包含在条件中):

import multiprocessing as mp
taskQ = mp.Queue()
resultQ = mp.Queue()
lock = mp.Lock()
# both conditions share lock
taskCond = mp.Condition(lock)
resultCond = mp.Condition(lock)

客户端获取任务;所有客户端都使用此功能。请注意,只要结果队列中有内容,任务就不会被消耗:

def get_task():
    taskCond.acquire()
    while taskQ.qsize() == 0 or resultQ.qsize():
        taskCond.wait()
    # resultQ is empty and taskQ has something
    task = taskQ.get()
    taskCond.release()
    return task

客户端有结果:

with resultCond:
    resultQ.put(result)
    # only the server waits on resultCond
    resultCond.notify()

服务器循环:

resultCond.acquire()
while True:
    while resultQ.qsize() == 0:
        resultCond.wait()
    # operations on both queues in all clients are blocked now
    # ... drain resultQ, reorder taskQ ...
    taskCond.notify_all()

注意事项:

  1. qsize() 通常是概率性的,但由于所有队列操作都是在持有锁时完成的,因此在这种情况下它是可靠的。

  2. 事实上,因为所有的队列操作在这里都被我们自己的锁所保护,所以真的没有必要使用mp.Queue。例如,mp.Manager().list() 也可以工作(任何共享结构)。当您重新安排任务时,列表可能更容易处理?

  3. 有一部分我不太喜欢:当服务器执行 taskCond.notify_all() 时,一些客户端可能正在等待获取新任务,而其他客户端可能正在等待返回一个新的结果。它们可以按任何顺序运行。一旦任何等待返回结果的客户端有机会,所有等待获取任务的客户端都会阻塞,但在此之前任务将被消耗。当然,这里的“问题”是我们不知道在将某些内容实际添加到结果队列之前有新结果在等待。

对于最后一个,也许将“客户端有结果”代码更改为:

resultQ.put(result)
with resultCond:
    resultCond.notify()

会更好。不确定。它确实使推理变得更加困难,因为所有队列操作都在我们的锁的保护下完成不再是真的。

关于python多处理共享队列重新排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19286593/

相关文章:

python - TypeError: 'WebElement' 对象不可下标

c++ - 无限等待条件变量

node.js - 使用worker_threads的 typescript 配置

client-server - Meteor 是如何工作的,客户端还是服务器?

python - .findall() 可以匹配 python etree 中的多个值吗?

python - 当尝试将函数与Python中的图像匹配时,有没有办法计算残差?

python - 尝试为双向链表编写交换位置函数

multithreading - 使用 HT 在一个 Core 上执行的线程之间的数据交换将使用什么?

PHP : Best way to push data from server to clients. ....?

client-server - 如何防止非法客户端应用程序使用我的服务器?