python - 扩展python Queue.PriorityQueue(工作优先级,工作包类型)

标签 python queue parallel-processing

我想扩展此处描述的 Queue.PriorityQueue:http://docs.python.org/library/queue.html#Queue.PriorityQueue

队列将优先保存工作包。 worker 将获得工作包并进行处理。我想补充以下几点:

  1. worker 也有优先权。当多个工作人员空闲时,具有最高优先级的工作人员应该处理传入的工作包。

  2. 并非每个工作人员都可以处理每个工作包,因此需要一种机制来检查工作包类型和工作人员能力是否匹配。

我正在寻找提示,如何最好地实现这一点(从头开始,扩展 PrioriyQueue 或 Queue,...)。

编辑

这是我的第一次(未经测试的)尝试。基本思想是所有等待的线程都会收到通知。然后他们都尝试通过_choose_worker(self,worker)获取工作项。 (使其成为社区维基)

编辑

现在可以进行一些简单的测试...

编辑_choose_worker 函数中添加了自定义 BaseManager 和工作人员列表的本地副本。

编辑 错误修复

import Queue
from Queue import Empty, Full
from time import time as _time
import heapq

class AdvancedQueue(Queue.PriorityQueue):

    # Initialize the queue representation
    def _init(self, _maxsize):
        self.queue = []
        self.worker = []

    def put(self, item, block=True, timeout=None):
        '''
        Put an item into the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until a free slot is available. If 'timeout' is
        a positive number, it blocks at most 'timeout' seconds and raises
        the Full exception if no free slot was available within that time.
        Otherwise ('block' is false), put an item on the queue if a free slot
        is immediately available, else raise the Full exception ('timeout'
        is ignored in that case).
        '''
        self.not_full.acquire()
        try:
            if self.maxsize > 0:
                if not block:
                    if self._qsize() == self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() == self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a positive number")
                else:
                    endtime = _time() + timeout
                    while self._qsize() == self.maxsize:
                        remaining = endtime - _time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)
            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notifyAll()  # only change
        finally:
            self.not_full.release()

    def get(self, worker, block=True, timeout=None):
        self.not_empty.acquire()
        try:
            self._put_worker(worker)

            if not block:
                if not self._qsize():
                    raise Empty
                else:
                    return self._choose_worker(worker)
            elif timeout is None:
                while True:
                    while not self._qsize():
                        self.not_empty.wait()
                    try:
                        return self._choose_worker(worker)
                    except Empty:
                        self.not_empty.wait()

            elif timeout < 0:
                raise ValueError("'timeout' must be a positive number")
            else:
                endtime = _time() + timeout
                def wait(endtime):
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)

                while True:
                    while not self._qsize():
                        wait(endtime)

                    try:
                        return self._choose_worker(worker)
                    except Empty:
                        wait(endtime)
        finally:
            self._remove_worker(worker)
            self.not_empty.release()

    # Put a new worker in the worker queue
    def _put_worker(self, worker, heappush=heapq.heappush):
        heappush(self.worker, worker)

    # Remove a worker from the worker queue
    def _remove_worker(self, worker):
        self.worker.remove(worker)

    # Choose a matching worker with highest priority
    def _choose_worker(self, worker):
        worker_copy = self.worker[:]    # we need a copy so we can remove assigned worker
        for item in self.queue:
            for enqueued_worker in worker_copy:
                if item[1].type in enqueued_worker[1].capabilities:
                    if enqueued_worker == worker:
                        self.queue.remove(item)
                        self.not_full.notify()
                        return item
                    else:
                        worker_copy.remove(enqueued_worker)
                        # item will be taken by enqueued_worker (which has higher priority),
                        # so enqueued_worker is busy and can be removed
                        continue
        raise Empty

最佳答案

我认为您描述的是一种情况,其中有两个“优先级队列” - 一个用于作业,一个用于 worker 。天真的方法是选择最优先的工作和最优先的 worker ,并尝试将它们配对。但是,当工作人员无法执行作业时,这当然会失败。

要解决此问题,我建议首先执行最高优先级的作业,然后按优先级降序遍历所有工作线程,直到找到可以处理该作业的工作线程。如果没有一个工作人员可以处理该作业,则执行第二高优先级的作业,依此类推。因此,您实际上有嵌套循环,如下所示:

def getNextWorkerAndJobPair():
    for job in sorted(jobs, key=priority, reverse=True):
        for worker in sorted(workers, key=priority, reverse=True):
             if worker.can_process(job):
                 return (worker, job)

上面的示例对数据进行了多次不必要的排序。为了避免这种情况,最好按排序顺序存储数据。至于使用什么数据结构,我不太确定什么是最好的。理想情况下,您需要 O(log n) 次插入和删除,并且能够在 O(n) 时间内按排序顺序迭代集合。我认为 PriorityQueue 满足第一个要求,但不满足第二个要求。我想象来自 blist 的排序列表包可以工作,但我自己还没有尝试过,并且该网页没有具体说明此类提供的性能保证。

我建议首先迭代作业,然后迭代内部循环中的工作人员的方法并不是您可以采取的唯一方法。您还可以颠倒循环顺序,以便首先选择优先级最高的工作人员,然后尝试为其找到工作。或者您可以找到对于某些函数 f 具有 f(priority_job,priority_worker) 最大值的有效(作业, worker )对(例如仅添加优先级)。

关于python - 扩展python Queue.PriorityQueue(工作优先级,工作包类型),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3849157/

相关文章:

parallel-processing - CUDA 中的忙碌旋转

parallel-processing - Windows Azure : Parallelization of the code

python - API 访问失败期间引发的适当异常

iphone - iPhone 上的线性代数(python/numpy?)

java - 用于显示错误消息的基于拉取的队列

php - Laravel 5.2 队列 - 延迟不起作用

java - 有没有在数组之上实现队列并自动调整大小的java库?

python - 如何在毫秒级同步 Python 进程?

python - 基于模式的解析不是在开头

python - 在 python 中生成结构化文本的想法或模块?