python - 队列和多处理

标签 python multithreading queue multiprocessing huffman-code

我正在编写一些代码来构建可变长度 (Huffman) 代码表,并且我想使用多处理模块来获得乐趣。这个想法是让每个进程尝试从队列中获取一个节点。他们在节点上工作,或者将节点的两个子节点放回工作队列,或者将可变长度代码放入结果队列。它们还将消息传递到消息队列,消息队列应由主进程中的线程打印。这是到目前为止的代码:

import Queue
import multiprocessing as mp
from threading import Thread
from collections import Counter, namedtuple

Node = namedtuple("Node", ["child1", "child2", "weight", "symbol", "code"])

def _sort_func(node):
    return node.weight

def _encode_proc(proc_number, work_queue, result_queue, message_queue):
    while True:
        try:
            #get a node from the work queue
            node = work_queue.get(timeout=0.1)
            #if it is an end node, add the symbol-code pair to the result queue
            if node.child1 == node.child2 == None:
                message_queue.put("Symbol processed! : proc%d" % proc_number)
                result_queue.put({node.symbol:node.code})
            #otherwise do some work and add some nodes to the work queue
            else:
                message_queue.put("More work to be done! : proc%d" % proc_number)
                node.child1.code.append(node.code + '0')
                node.child2.code.append(node.code + '1')
                work_queue.put(node.child1)
                work_queue.put(node.child2)
        except Queue.Empty: #everything is probably done
            return

def _reporter_thread(message_queue):
    while True:
        try:
            message = message_queue.get(timeout=0.1)
            print message
        except Queue.Empty: #everything is probably done
            return

def _encode_tree(tree, symbol_count):
    """Uses multiple processes to walk the tree and build the huffman codes."""
    #Create a manager to manage the queues, and a pool of workers.
    manager = mp.Manager()
    worker_pool = mp.Pool()
    #create the queues you will be using
    work = manager.Queue()
    results = manager.Queue()
    messages = manager.Queue()
    #add work to the work queue, and start the message printing thread
    work.put(tree)
    message_thread = Thread(target=_reporter_thread, args=(messages,))
    message_thread.start()
    #add the workers to the pool and close it
    for i in range(mp.cpu_count()):
        worker_pool.apply_async(_encode_proc, (i, work, results, messages))
    worker_pool.close()
    #get the results from the results queue, and update the table of codes
    table = {}
    while symbol_count > 0:
        try:
            processed_symbol = results.get(timeout=0.1)
            table.update(processed_symbol)
            symbol_count -= 1
        except Queue.Empty:
            print "WAI DERe NO SYMBOLzzzZzz!!!"
        finally:
            print "Symbols to process: %d" % symbol_count
    return table

def make_huffman_table(data):
    """
    data is an iterable containing the string that needs to be encoded.
    Returns a dictionary mapping symbols to codes.
    """
    #Build a list of Nodes out of the characters in data
    nodes = [Node(None, None, weight, symbol, bytearray()) for symbol, weight in Counter(data).items()]
    nodes.sort(reverse=True, key=_sort_func)
    symbols = len(nodes)
    append_node = nodes.append
    while len(nodes) > 1:
        #make a new node out of the two nodes with the lowest weight and add it to the list of nodes.
        child2, child1 = nodes.pop(), nodes.pop()
        new_node = Node(child1, child2, child1.weight+child2.weight, None, bytearray())
        append_node(new_node)
        #then resort the nodes
        nodes.sort(reverse=True, key=_sort_func)
    top_node = nodes[0]
    return _encode_tree(top_node, symbols)

def chars(fname):
    """
    A simple generator to make reading from files without loading them 
    totally into memory a simple task.
    """
    f = open(fname)
    char = f.read(1)
    while char != '':
        yield char
        char = f.read(1)
    f.close()
    raise StopIteration

if __name__ == "__main__":
    text = chars("romeo-and-juliet.txt")
    table = make_huffman_table(text)
    print table

当前的输出是:

More work to be done! : proc0
WAI DERe NO SYMBOLzzzZzz!!!
Symbols to process: 92
WAI DERe NO SYMBOLzzzZzz!!!
Symbols to process: 92
WAI DERe NO SYMBOLzzzZzz!!!
Symbols to process: 92

它只是永远重复最后一点。在第一个进程向节点添加工作后,一切都停止了。这是为什么?我不理解/正确使用队列吗?抱歉阅读所有代码。

最佳答案

您的第一个问题是尝试使用超时。他们几乎从来都不是一个好主意。如果您不可能想出可靠的方法来有效地做某事,那么它们可能是个好主意,并且您仅将超时用作第一> 检查某事是否真的完成了。

也就是说,主要问题是 multiprocessing 通常非常不擅长报告工作进程中发生的异常。你的代码实际上在这里死了:

node.child1.code.append(node.code + '0')

您没有看到的错误消息是“需要大小为 1 的整数或字符串”。您不能将 bytearray 附加到 bytearray。你想做的:

node.child1.code.extend(node.code + '0')
                 ^^^^^^

相反,在 child2 的类似行中。照原样,因为第一个从工作队列中取出东西的工作进程死了,所以没有更多的东西被添加到工作队列中。这解释了你所看到的一切 - 到目前为止 ;-)

无超时

仅供引用,避免超时(不稳定 - 不可靠)的常用方法是在队列中放置一个特殊的标记值。消费者知道是时候在他们看到哨兵时退出,并使用普通的阻塞 .get() 从队列中检索项目。所以首先要做的是创建一个哨兵;例如,在顶部附近添加:

ALL_DONE = "all done"

最佳实践 .join() 线程和进程 - 这样主程序知道(不仅仅是猜猜)他们什么时候也完成了。

因此,您可以像这样更改 _encode_tree() 的结尾:

for i in range(1, symbol_count + 1):
    processed_symbol = results.get()
    table.update(processed_symbol)
    print "Symbols to process: %d" % (symbol_count - i)
for i in range(mp.cpu_count()):
    work.put(ALL_DONE)
worker_pool.join()
messages.put(ALL_DONE)
message_thread.join()
return table

这里的关键是主程序知道所有的工作都在当且仅当没有符号需要处理时才完成。在此之前,它可以无条件地从 results 队列中 .get() 结果。然后它将与 worker 数量相等的哨兵数量放入工作队列。他们每个人都会吃掉一个哨兵然后退出。然后我们等待他们完成(worker_pool.join())。然后将哨兵放入消息队列,我们​​等待该线程也结束。只有这样函数才会返回。

现在没有任何事情会提前终止,一切都会干净地关闭,最终表的输出不会再与工作人员和消息线程的各种其他输出混淆。 _reporter_thread() 被重写为:

def _reporter_thread(message_queue):
    while True:
        message = message_queue.get()
        if message == ALL_DONE:
            break
        else:
            print message

_encode_proc() 也类似。不再有超时或 try/except Queue.Empty: 摆弄。您甚至不必再导入 Queue :-)

关于python - 队列和多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20337532/

相关文章:

python - 如何将字典分配给另一个字典?

c++ - QT:无法从另一个线程启用套接字通知程序

java - JVM 在调用停止之前允许关闭 Hook 运行多长时间?

Java用持久化存储实现锁定机制

c++ -/boost/lockfree/queue.hpp: 错误: 静态断言失败: (boost::has_trivial_destructor<T>::value)

java - 任务队列java

python - Django:allow_tags和short_description如何工作?

python - 使用 Pandas 和 spaCy 提取句子嵌入特征

python - Django:测试加载静态文件是否成功

java - Java中的方法队列