python多处理自喂消费者永远锁

标签 python process queue multiprocessing

问题是消费者永远不会退出,它只是挂起什么也不做。该代码的目的是这样工作的:

创建一个队列并将一些任务数据传递到其中。创建指定数量的消费者来处理数据。当消费者发现队列为空时,由于仍有可能另一个消费者可以将东西放入队列,因此它无法离开,但它可以在 consumers_finished 列表中指示它没有任何东西工作。消费者循环一直持续到每个 worker 都表明他们完成了工作。不知道会有多少工作,因为消费者将任务放入队列中。我读过一些相关内容,但不清楚进程是否可以永远等待,如果它们 self 满足。

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, results, consumers_finished):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.results = results
        self.consumers_finished = consumers_finished

    def run(self):
        while not all(flag for flag in self.consumers_finished.values()):
            task_data = self.task_queue.get()
            if not task_data:
                self.consumers_finished[self.name] = True
                continue

            self.consumers_finished[self.name] = False
            task_result = self.do_some_processing(task_data)
            self.task_queue.put(task_result)


class Starter(object):

    def start(self):
        manager = multiprocessing.Manager()
        task_queue = multiprocessing.Queue()
        results = manager.list()
        consumers_finished = manager.dict()

        consumers = [Consumer(task_queue, results, consumers_finished) for i in range(self.consumers_count)]

        for consumer in consumers:
            consumers_finished[consumer.name] = False
            consumer.start()

        task_queue.put(task_data)

        for consumer in consumers: consumer.join()

        return results

最佳答案

看来良好的 sleep 确实有帮助,焕然一新的头脑可以做更多的事情.. 不管怎样,我在研究了python文档后找到了解决方案。

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, results, consumers_finished):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.results = results
        self.consumers_finished = consumers_finished

    def run(self):
        while not all(flag for flag in self.consumers_finished.values()):
            try:
                task = self.todo_queue.get(False)
                self.consumers_finished[self.name] = False
            except QueueEmpty:
                self.consumers_stopped[self.name] = True
                continue

            task_result = self.do_some_processing(task_data)
            self.task_queue.put(task_result)


class Starter(object):

    def start(self):
        manager = multiprocessing.Manager()
        task_queue = manager.Queue()
        results = manager.list()
        consumers_finished = manager.dict()

        consumers = [Consumer(task_queue, results, consumers_finished) for i in range(self.consumers_count)]

        for consumer in consumers:
            consumers_finished[consumer.name] = False
            consumer.start()

        task_queue.put(task_data)

        for consumer in consumers: consumer.join()

        return results

这是Python文档的一部分,它解释了我认为的问题:

Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread()), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children. Note that a queue created using a manager does not have this issue. See Programming guidelines.

所以我刚刚更改了队列,它现在由管理器创建,并且在消费者的 run 方法中以不同的方式从队列中获取任务,请参阅代码。

关于python多处理自喂消费者永远锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7958560/

相关文章:

c - 如何在 Unix 中将参数从父进程传递给子进程?

C 队列字符数组

javascript - 排队单独的连续动画并按顺序运行而不是重复

c# - Process.Start 阻塞

python - 如何在 python-bash 环境中管理进程?

java - 错误的层次顺序遍历

python - FastAPI SQLAlchemy 无法将字典更新序列元素 #0 转换为序列

python - 转置数据框和排序

类似于 MATLAB 的 end/2 的 Python 索引

python - 将书籍作者分类为小说与非小说