python - 处理来自多个进程的单个文件

标签 python multithreading multiprocessing

我有一个大文本文件,我想在其中处理每一行(做一些操作)并将它们存储在数据库中。由于单个简单程序花费的时间太长,我希望它通过多个进程或线程来完成。 每个线程/进程都应该从该单个文件中读取不同的数据(不同的行),并对它们的数据(行)进行一些操作并将它们放入数据库中,这样最后,我就可以处理所有的数据和我的数据库转储了我需要的数据。

但我无法弄清楚如何解决这个问题。

最佳答案

您正在寻找的是生产者/消费者模式

基本线程示例

这是一个使用 threading module 的基本示例(而不是多处理)

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
    while True:
        item = in_queue.get()
        # process
        result = item
        out_queue.put(result)
        in_queue.task_done()

if __name__ == "__main__":
    work = Queue.Queue()
    results = Queue.Queue()
    total = 20

    # start for workers
    for i in xrange(4):
        t = threading.Thread(target=do_work, args=(work, results))
        t.daemon = True
        t.start()

    # produce data
    for i in xrange(total):
        work.put(i)

    work.join()

    # get the results
    for i in xrange(total):
        print results.get()

    sys.exit()

您不会与线程共享文件对象。您可以通过提供 queue 为他们生产工作。与数据行。然后每个线程会拿起一行,处理它,然后在队列中返回它。

multiprocessing module 中内置了一些更先进的设施。共享数据,例如列表和 special kind of Queue .使用多处理与线程需要权衡取舍,这取决于您的工作是受 CPU 限制还是 IO 限制。

基本的 multiprocessing.Pool 示例

这是一个非常基本的多处理池示例

from multiprocessing import Pool

def process_line(line):
    return "FOO: %s" % line

if __name__ == "__main__":
    pool = Pool(4)
    with open('file.txt') as source_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(process_line, source_file, 4)

    print results

A Pool是管理自己的进程的便利对象。由于打开的文件可以遍历其行,因此您可以将其传递给 pool.map(),后者将遍历它并将行传递给工作函数。 Map阻塞并在完成后返回整个结果。请注意,这是一个过于简化的示例,并且 pool.map() 将在执行工作之前将您的整个文件一次全部读入内存。如果您希望有大文件,请记住这一点。有更高级的方法来设计生产者/消费者设置。

具有限制和线重新排序的手动“池”

这是 Pool.map 的手动示例,但不是一次性消耗整个迭代,您可以设置队列大小,以便您仅以尽可能快的速度逐个喂它。我还添加了行号,以便您以后可以跟踪它们并在需要时引用它们。

from multiprocessing import Process, Manager
import time
import itertools 

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal 
        if line == None:
            return

        # fake work
        time.sleep(.5)
        result = (line_no, line)

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    with open("source.txt") as f:
        iters = itertools.chain(f, (None,)*num_workers)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)

    for p in pool:
        p.join()

    # get the results
    # example:  [(1, "foo"), (10, "bar"), (0, "start")]
    print sorted(results)

关于python - 处理来自多个进程的单个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11196367/

相关文章:

javascript - python Flask 动态下拉列表

Python:检查数组是否没有所需数量的成员

Android:如何从 C++ 中的另一个线程读取 Assets

multithreading - 为什么这个 Haskell 程序在使用 -threaded 编译时执行异常?

multithreading - Boost Asio是否按顺序调用异步处理程序

python - 符合唯一性标准的相似项目总数

python - Openflow 多部分请求错误消息 : OFPBRC_BAD_LEN (6)

Python:如何为 multiprocessing.Pool 中的进程使用不同的日志文件?

sockets - 如何将网络套接字连接发送到 Tornado 中的不同进程?

python - 共享非连续访问 Numpy 数组