python -> 多处理模块

标签 python queue multiprocessing

这是我想要完成的 -

  1. 我有大约一百万个文件,我需要解析这些文件并将解析后的内容附加到一个文件中。
  2. 由于单个过程需要很长时间,因此该选项已被淘汰。
  3. 不在 Python 中使用线程,因为它本质上是运行单个进程(由于 GIL)。
  4. 因此使用多处理模块。即生成 4 个子进程以利用所有原始核心功能:)

到目前为止一切顺利,现在我需要一个所有子进程都可以访问的共享对象。我正在使用多处理模块中的队列。此外,所有子流程都需要将其输出写入单个文件。我猜是一个可以使用 Locks 的地方。当我运行这个设置时,我没有收到任何错误(所以父进程看起来很好),它只是停止了。当我按下 ctrl-C 时,我看到一个回溯(每个子进程一个)。也没有输出写入输出文件。这是代码(请注意,没有多进程一切都运行良好)-

import os
import glob
from multiprocessing import Process, Queue, Pool

data_file  = open('out.txt', 'w+')

def worker(task_queue):
    for file in iter(task_queue.get, 'STOP'):
        data = mine_imdb_page(os.path.join(DATA_DIR, file))
        if data:
            data_file.write(repr(data)+'\n')
    return

def main():
    task_queue = Queue()
    for file in glob.glob('*.csv'):
        task_queue.put(file)
    task_queue.put('STOP') # so that worker processes know when to stop

    # this is the block of code that needs correction.
    if multi_process:
        # One way to spawn 4 processes
        # pool = Pool(processes=4) #Start worker processes
        # res  = pool.apply_async(worker, [task_queue, data_file])

        # But I chose to do it like this for now.
        for i in range(4):
            proc = Process(target=worker, args=[task_queue])
            proc.start()
    else: # single process mode is working fine!
        worker(task_queue)
    data_file.close()
    return

我做错了什么?我还尝试在生成时将打开的 file_object 传递给每个进程。但没有效果。例如- Process(target=worker, args=[task_queue, data_file])。但这并没有改变什么。我觉得子进程由于某种原因无法写入文件。 file_object 的实例未被复制(在生成时)或其他一些怪癖......有人知道吗?

额外:还有什么方法可以保持持久的 mysql_connection 打开并将其传递给 sub_processes?因此,我在我的父进程中打开了一个 mysql 连接,打开的连接应该可供我所有的子进程访问。基本上这相当于 python 中的 shared_memory。这里有什么想法吗?

最佳答案

尽管与 Eric 的讨论富有成果,但后来我找到了更好的方法。在 multiprocessing 模块中,有一个名为“Pool”的方法非常适合我的需要。

它会根据我的系统拥有的内核数量进行 self 优化。即只产生与编号一样多的进程。的核心。当然这是可以定制的。所以这是代码。以后可能会帮助别人-

from multiprocessing import Pool

def main():
    po = Pool()
    for file in glob.glob('*.csv'):
        filepath = os.path.join(DATA_DIR, file)
        po.apply_async(mine_page, (filepath,), callback=save_data)
    po.close()
    po.join()
    file_ptr.close()

def mine_page(filepath):
    #do whatever it is that you want to do in a separate process.
    return data

def save_data(data):
    #data is a object. Store it in a file, mysql or...
    return

仍在经历这个巨大的模块。不确定 save_data() 是否由父进程执行,或者此函数是否由派生的子进程使用。如果是 child 进行保存,则在某些情况下可能会导致并发问题。如果有人有使用此模块的经验,您将在这里获得更多知识......

关于python -> 多处理模块,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3586723/

相关文章:

python - 嵌入基于Python的基于Web的外壳(Werkzeug?)

python - torch : "Model Weights not Changing"

python - 从操作系统创建 Pandas 数据框

python - Pandas IO SQL 和具有多个结果集的存储过程

c++ - 在 STL 列表中找到一对,其中只有第一个元素是已知的

google-app-engine - 在 App Engine 上排队电子邮件

python - 使用 PySide 和多处理卡住 GUI

ios - NSOperationQueue 内的 NSOperation 中的异步回调永远不会被调用

python - Python 中的 ProcessPoolExecutor 和 Lock

python multiprocess.Pool 在标准输出中按顺序显示结果