python - 0mq一对多连接

标签 python multithreading zeromq

使用0mq建立进程间双向通信最正确的方法是什么?我需要创建几个后台进程来等待来自主进程的命令,执行一些计算并将结果返回给主进程。

最佳答案

有几种方法可以做到这一点。最直接的方法可能是使用 REQ/REP 套接字。每个后台进程/worker 都有一个 REP 套接字,您可以使用一个 REQ 套接字与它们通信:

import zmq

def worker(addr):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind(addr)
    while True:
        # get message from boss
        msg = socket.recv()
        # ...do smth
        # send back results
        socket.send(msg)

if __name__ == '__main__':
    # spawn 5 workers
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('tcp://127.0.0.1:500%d' % i,)).start()

您必须连接到每个工作人员才能向他们发送消息并取回结果:

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(worker_addr)
socket.send('message')
msg = socket.recv()

另一种方法是使用 PUB/SUB 向工作人员发送消息并使用 PUSH/PULL 收获结果:

import zmq

def worker(worker_id, publisher_addr, results_addr):
    context = zmq.Context()
    sub = context.socket(zmq.SUB)
    sub.connect(publisher_addr)
    sub.setsockopt(zmq.SUBSCRIBE, worker_id)
    push = context.socket(zmq.PUSH)
    push.connect(results_addr)

    while True:
        msg = sub.recv_multipart()[1]
        # do smth, send off results
        push.send_multipart([worker_id, msg])

if __name__ == '__main__':
    publisher_addr = 'tcp://127.0.0.1:5000'
    results_addr = 'tcp://127.0.0.1:5001'

    # launch some workers into space
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('worker-%d' % i, publisher_addr, results_addr,)).start()

要向特定工作人员广播命令,您可以执行以下操作:

context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind(publisher_addr)
# send message to worker-1
pub.send_multipart(['worker-1', 'hello'])

拉入结果:

context = zmq.Context()
pull = context.socket(zmq.PULL)
pull.bind(results_addr)

while True:
    worker_id, result = pull.recv_multipart()
    print worker_id, result

关于python - 0mq一对多连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7178384/

相关文章:

java - 使用 Linux 为 Java 应用程序设置最大创建线程数

c++ - 如何在 ZeroMQ(C++) 中使用 XPUB 和 XSUB 实现带有代理的 Pub-Sub 网络?

python - 在 Python 中拆分元组 - 最佳实践?

python-Pandas df.sum() 跨多列意外 arg 'axis' 错误

java - 线程 "Thread-8"java.lang.StackOverflowError...at java.util.Random.nextInt 中出现异常(来源未知)

python - 如何在Python中的线程中运行zeroRpc服务器?

java - Node.JS 和 Java PUB/SUB

python - 如何使用 Python CSV Writer 保留尾随零

python - 如何忽略第一屈服值?

Java Process.waitFor() 和 Readline 挂起