python - ZeroMQ:负载平衡许多 worker 和一个主人

标签 python zeromq

假设我有一个主进程,它划分要并行处理的数据。假设有 1000 个数据 block 和 100 个节点用于运行计算。

是否有某种方法可以执行 REQ/REP 以使所有工作人员都忙碌?我尝试使用指南中的负载平衡器模式,但对于单个客户端,sock.recv() 将阻塞,直到它收到工作人员的响应。

这是代码,根据负载均衡器的 zmq 指南稍作修改。是在中间启动一个客户端、10 个工作人员和一个负载平衡器/代理。我怎样才能让所有这些 worker 同时工作???

from __future__ import print_function
from multiprocessing import Process
import zmq
import time
import uuid
import random

def client_task():
    """Basic request-reply client using REQ socket."""
    socket = zmq.Context().socket(zmq.REQ)
    socket.identity = str(uuid.uuid4())
    socket.connect("ipc://frontend.ipc")
    # Send request, get reply
    for i in range(100):
        print("SENDING: ", i)
        socket.send('WORK')
        msg = socket.recv()
        print(msg)

def worker_task():
    """Worker task, using a REQ socket to do load-balancing."""
    socket = zmq.Context().socket(zmq.REQ)
    socket.identity = str(uuid.uuid4())
    socket.connect("ipc://backend.ipc")
    # Tell broker we're ready for work
    socket.send(b"READY")
    while True:
        address, empty, request = socket.recv_multipart()
        time.sleep(random.randint(1, 4))
        socket.send_multipart([address, b"", b"OK : " + str(socket.identity)])


def broker():
    context = zmq.Context()
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("ipc://frontend.ipc")
    backend = context.socket(zmq.ROUTER)
    backend.bind("ipc://backend.ipc")
    # Initialize main loop state
    workers = []
    poller = zmq.Poller()
    # Only poll for requests from backend until workers are available
    poller.register(backend, zmq.POLLIN)

    while True:
        sockets = dict(poller.poll())
        if backend in sockets:
            # Handle worker activity on the backend
            request = backend.recv_multipart()
            worker, empty, client = request[:3]
            if not workers:
                # Poll for clients now that a worker is available
                poller.register(frontend, zmq.POLLIN)
            workers.append(worker)
            if client != b"READY" and len(request) > 3:
                # If client reply, send rest back to frontend
                empty, reply = request[3:]
                frontend.send_multipart([client, b"", reply])

        if frontend in sockets:
            # Get next client request, route to last-used worker
            client, empty, request = frontend.recv_multipart()
            worker = workers.pop(0)
            backend.send_multipart([worker, b"", client, b"", request])
            if not workers:
                # Don't poll clients if no workers are available
                poller.unregister(frontend)

    # Clean up
    backend.close()
    frontend.close()
    context.term()

def main():
    NUM_CLIENTS = 1
    NUM_WORKERS = 10
    # Start background tasks
    def start(task, *args):
        process = Process(target=task, args=args)
        process.start()
    start(broker)

    for i in range(NUM_CLIENTS):
        start(client_task)

    for i in range(NUM_WORKERS):
        start(worker_task)


    # Process(target=broker).start()




if __name__ == "__main__":
    main()

最佳答案

我想有不同的方法可以做到这一点:

-例如,您可以使用 threading 模块从您的单个客户端启动所有请求,例如:

result_list = []  # Add the result to a list for the example 
rlock = threading.RLock()

def client_thread(client_url, request, i):
    context = zmq.Context.instance()
    socket = context.socket(zmq.REQ)

    socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i))
    socket.connect(client_url)

    socket.send(request.encode())
    reply = socket.recv()

    with rlock:
        result_list.append((i, reply))
    return

def client_task():
    # tasks = list with all your tasks
    url_client = "ipc://frontend.ipc"
    threads = []
    for i in range(len(tasks)):
        thread = threading.Thread(target=client_thread,
                                    args=(url_client, tasks[i], i,))
        thread.start()
        threads.append(thread)

-您可以利用像 asyncio 这样的事件库(有一个子模块 zmq.asyncio 和另一个库 aiozmq,最后一个提供更高的抽象层次)。在这种情况下,您也将按顺序向工作人员发送请求,但不会对每个响应进行阻塞(因此不会使主循环忙碌)并在他们返回主循环时获取结果。这可能看起来像这样:

import asyncio
import zmq.asyncio

async def client_async(request, context, i, client_url):
    """Basic client sending a request (REQ) to a ROUTER (the broker)"""
    socket = context.socket(zmq.REQ)
    socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i))
    socket.connect(client_url)
    await socket.send(request.encode())
    reply = await socket.recv()
    socket.close()
    return reply


async def run(loop):
    # tasks = list full of tasks
    url_client = "ipc://frontend.ipc"
    asyncio_tasks = []
    ctx = zmq.asyncio.Context()
    for i in range(len(tasks)):
        task = asyncio.ensure_future(client_async(tasks[i], ctx, i, url_client))
        asyncio_tasks.append(task)

    responses = await asyncio.gather(*asyncio_tasks)
    return responses

zmq.asyncio.install()
loop = asyncio.get_event_loop()
results = loop.run_until_complete(run(loop))

我没有测试这两个片段,但它们都来 self 使用 zmq 的代码(经过修改以适应问题),配置与您的问题类似。

关于python - ZeroMQ:负载平衡许多 worker 和一个主人,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39862022/

相关文章:

c++ - 如何避免丢弃消息 zeromq pub sub

javascript - 尝试停止或关闭时 ZeroRPC python 服务器异常

python - 'self' 作为 PyQt5 中的参数有什么区别

python - 从python中的字符串中去除非字母数字字符但保留特殊字符

python - 使用计数器 python 来自 2 个因式分解列表的 gcd

c++ - 试图从 zeromq pollitem 获取套接字实例

python - IPython 并行计算与用于集群计算的 pyzmq

python - Python 有绳索数据结构吗?

python - 字典数据未正确附加到另一个字典

zeromq - ZMQ REP,知道谁发送请求