python - 由于阻塞 Queue.get() 方法导致的死锁

标签 python python-2.7 queue multiprocessing producer-consumer

正如标题所暗示的,我有一个僵局,不知道为什么。我有多个生产者,只有一个消费者。 schedule_task方法在线程调用了队列的get方法后会被多个进程调用

from logging import getLogger
from time import sleep
from threading import Event, Thread
from multiprocessing import Process
from Queue import Queue


class TaskExecutor(object):
    def __init__(self):
        print("init taskExecutor")
        self.event = Event()
        self.taskInfos = Queue()
        task_thread = Thread(target=self._run_worker_thread)
        self._instantEnd = False
        self._running = True
        task_thread.daemon = True
        task_thread.start()

    def _run_worker_thread(self):
        print("start running taskExcecutor worker Thread")
        while self.is_running():
            try:
                print("try to get queued task from %s" % str(self.taskInfos))
                msg, task = self.taskInfos.get()
                print("got task %s for msg: %s" % str(task), str(msg))
                task.execute(msg)
                self.taskInfos.task_done()
            except Exception, e:
                logger.error("Error: %s" % e.message)
        print("shutting down TaskExecutor!")

    def is_running(self):
        return True

    def schedule_task(self, msg, task):
        try:
            print("appending task '%s' for msg: %s" % (str(task), str(msg)))
            self.taskInfos.put((msg, task))
            print("into queue: %s " % str(self.taskInfos))
        except Exception, e:
            print("queue is probably full: %s" % str(e))


class Task(object):

    def execute(self, msg):
        print(msg)


executor = TaskExecutor()

def produce():
    cnt = 0
    while True:
        executor.schedule_task("Message " + str(cnt), Task())
        cnt += 1
        sleep(1)

for i in range(4):
    p = Process(target=produce)
    p.start()

从我的日志中我得到:

init taskExecutor
start running taskExcecutor worker Thread
try to get queued task from <Queue.Queue instance at 0x7fdd09830cb0>
 appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f35d0>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f3490>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f3b10>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f3b10>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 

谁能解释一下,我错过了什么?

最佳答案

虽然其他人不可能运行此代码(它不是独立的),但您展示的内容 没有明显的问题。所以问题出在您没有展示的地方 - 可能在创建和使用 TaskExecutor 实例的代码中.

当我插入我凭空弥补的缺失部分时,这段代码工作正常。

所以你需要展示的不仅仅是这个。如何替换:

logger.debug("try to get queued task")

logger.debug("try to get queued task from queue %s", self.taskInfos)

?然后至少我们可以看到您的生产者是否正在使用与您的消费者相同的队列。

下一步

感谢您添加。接下来:这是一个独立的程序供您试用。这很像你的代码。查看它是否适合您(适合我):

from threading import Thread, Lock
from Queue import Queue

class Logger:
     def __init__(self):
         self.iolock = Lock()

     def debug(self, str, *msg):
         with self.iolock:
             print str % msg

     error = debug

logger = Logger()

class TaskExecutor(object):
    def __init__(self):
        logger.debug("init taskExecutor")
        self.taskInfos = Queue()
        task_thread = Thread(target=self._run_worker_thread)
        task_thread.daemon = True
        task_thread.start()

    def is_running(self):
        return True

    def _run_worker_thread(self):
        logger.debug("start running taskExcecutor worker Thread")
        while self.is_running():
            try:
                logger.debug("try to get queued task from queue %s", self.taskInfos)
                msg, task = self.taskInfos.get()
                logger.debug("got task %s for msg: %s", str(task), str(msg))
                #task.execute(msg)
                self.taskInfos.task_done()
            except Exception, e:
                logger.error("Error: %s", e.message)
        logger.debug("shutting down TaskExecutor!")

    def schedule_task(self, msg, task):
        try:
            logger.debug("appending task '%s' for msg: %s", str(task), str(msg))
            self.taskInfos.put((msg, task))
            logger.debug("into queue: %s ", str(self.taskInfos))
        except Exception, e:
            logger.debug("queue is probably full: %s", str(e))

te = TaskExecutor()

def runit():
    for i in range(10):
        te.schedule_task("some task", i)

main = Thread(target=runit)
main.start()

下一步

好的,这段代码不可能工作。在 Linux-y 系统上,只有一个 TaskExecutor 实例在这里创建:

executor = TaskExecutor()

这发生在主程序中。每次你这样做:

p = Process(target=produce)

你的主程序是fork() '编辑。虽然 fork 进程也看到 executor ,这是主程序的 executor 的地址空间副本 , 与 executor 无关在主程序中(通常的写时复制 fork() 语义)。

每个子进程也有一个executor副本的数据成员,包括它的 Queue .所有子进程都将数据放在自己唯一的 executor 副本中。 ,但消费者线程在主程序中运行,而工作进程没有对其 executor 的副本执行任何操作可以对主程序的消费者线程看到的内容产生任何影响。

所以这真的很困惑。我现在必须停下来尝试弄清楚你可能真正想在这里做什么;-) 如果你想玩弄想法,请使用 multiprocessing.Queue 进行调查。 .进程间通信的唯一方法是使用从头开始构建的对象来支持进程间通信。 Queue.Queue永远不会为此工作。

还有一个

这是跨进程运行良好的一个,甚至在 Windows 上也运行良好 ;-)

from time import sleep
from threading import Thread
from multiprocessing import Process, JoinableQueue

class TaskExecutor(Thread):
    def __init__(self):
        print("init taskExecutor")
        Thread.__init__(self)
        self.taskInfos = JoinableQueue()

    def getq(self):
        return self.taskInfos

    def run(self):
        print("start running taskExcecutor worker Thread")
        while self.is_running():
            try:
                print("try to get queued task from %s" % self.taskInfos)
                msg, task = self.taskInfos.get()
                print("got task %s for msg: %s" % (task, msg))
                task.execute(msg)
                self.taskInfos.task_done()
            except Exception, e:
                print("Error: %s" % e.message)
        print("shutting down TaskExecutor!")

    def is_running(self):
        return True

class Task(object):
    def execute(self, msg):
        print(msg)

def produce(q):
    cnt = 0
    while True:
        q.put(("Message " + str(cnt), Task()))
        cnt += 1
        sleep(1)

if __name__ == "__main__":
    executor = TaskExecutor()
    executor.start()
    for i in range(4):
        p = Process(target=produce, args=(executor.getq(),))
        p.start()

如果__name__ == "__main__"部分不仅允许它在 Windows 上运行,它还具有很大的“文档”值(value),一目了然 executor确实在主程序中运行。

不过,您的问题是这是否是您想要的分工。你真的想让主进程——而且只有主进程——做所有的事情吗

   task.execute(msg)

工作?从这里无法猜测这是否是您想要的。这就是代码的作用。

样式点:请注意,这去掉了 schedule_task()方法。并行处理可能很困难,几十年来我发现保持线程间/进程间通信尽可能简单和明显是非常有值(value)的。这意味着,除其他外,直接使用消息队列而不是例如将它们隐藏在方法中。这种情况下的抽象层通常会使正确的代码更难创建、扩展、调试和维护。

关于python - 由于阻塞 Queue.get() 方法导致的死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20508479/

相关文章:

python - 使用 Python 正则表达式查找两个变量之间的 HTML

python-2.7 - pandas:根据第一个字符映射新值

python - 在python中生成间隔之间的月份列表

java - java中的多线程持久队列

java - 在队列中添加元素时,调用监听器来通知队列元素是可变的

ruby-on-rails - Resque .. 我怎样才能得到队列列表

python - 如何删除Python中的尾随空格?

python - 词典词典: Sorting by a specific key

python - Pygame 问题 : maybe something to do with dictionaries?

python - 如何在 python 2.7 中删除 ) ' 和 , ?