Python 多处理问题?

标签 python zeromq

我有一个包含 500 个输入文件的文件夹(所有文件的总大小约为 500[MB])。

我想编写一个执行以下操作的 python 脚本:

(1)将所有输入文件加载到内存

(2) 初始化一个空的 python 列表,稍后将使用该列表……参见项目符号 (4)

(3) 启动 15 个不同的(独立的)进程:每个进程都使用相同的输入数据 [来自 (1)] -- 但使用不同的算法来对其进行处理,从而产生不同的结果

(4) 我希望 [来自步骤 (3)] 的所有独立进程将它们的输出存储在同一个 python 中list [在步骤 (2) 中初始化的同一个列表]

一旦所有 15 个进程都完成了运行,我将得到一个包含所有 15 个独立进程结果的 python 列表

我的问题是,是否可以在 python 中高效地执行上述操作?如果是这样,您能否提供说明如何执行此操作的方案/示例代码?

注意#1:我将在强大的多核服务器上运行它;所以这里的目标是在所有独立进程之间共享一些内存 {input data, output list} 的同时使用所有处理能力。

注意 #2:我在 Linux 环境中工作

最佳答案

好的,我只是用 zeromq 搞定了这个向多个发布者展示单个订阅者。您可能对队列执行相同的操作,但您需要对它们进行更多管理。 zeromq 套接字可以正常工作,这使得它非常适合像 IMO 这样的事情。

"""
demo of multiple processes doing processing and publishing the results
to a common subscriber
"""
from multiprocessing import Process


class Worker(Process):
    def __init__(self, filename, bind):
        self._filename = filename
        self._bind = bind
        super(Worker, self).__init__()

    def run(self):
        import zmq
        import time
        ctx = zmq.Context()
        result_publisher = ctx.socket(zmq.PUB)
        result_publisher.bind(self._bind)
        time.sleep(1)
        with open(self._filename) as my_input:
            for l in my_input.readlines():
                result_publisher.send(l)

if __name__ == '__main__':
    import sys
    import os
    import zmq

    #assume every argument but the first is a file to be processed
    files = sys.argv[1:]

    # create a worker for each file to be processed if it exists pass
    # in a bind argument instructing the socket to communicate via ipc
    workers = [Worker(f, "ipc://%s_%s" % (f, i)) for i, f \
               in enumerate((x for x in files if os.path.exists(x)))]

    # create subscriber socket
    ctx = zmq.Context()

    result_subscriber = ctx.socket(zmq.SUB)
    result_subscriber.setsockopt(zmq.SUBSCRIBE, "")

    # wire up subscriber to whatever the worker is bound to 
    for w in workers:
        print w._bind
        result_subscriber.connect(w._bind)

    # start workers
    for w in workers:
        print "starting workers..."
        w.start()

    result = []

    # read from the subscriber and add it to the result list as long
    # as at least one worker is alive
    while [w for w in workers if w.is_alive()]:
        result.append(result_subscriber.recv())
    else:
        # output the result
        print result

哦还有得到 zmq

$ pip install pyzmq-static

关于Python 多处理问题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6542192/

相关文章:

python - 如何在 PySpark 中构建稀疏矩阵?

python - 在组内联合非集合迭代的有效方法

python - ZeroMQ 无法在 [0.0.0.0 :5555] - address already in use. 上的 Docker 上进行 .bind() 为什么?

node.js - 无法在Windows中通过npm安装 Node ZeroMQ

python - Zeromq:为什么经销商发送一条消息并从一个客户接收消息?

python - ImportError:无法从 'ModelFactory' 导入名称 'frlearn.base'

python - json.load() 和 json.loads() 函数有什么区别

python - ImportError:Python 2.7.13 中没有名为 _tkinter 的模块

node.js 回调被多次调用

python - 如何在函数内部(以非阻塞方式)使用 zmq 在客户端请求时获取函数的状态?