假设我有一个主进程,它划分要并行处理的数据。假设有 1000 个数据 block 和 100 个节点用于运行计算。
是否有某种方法可以执行 REQ/REP 以使所有工作人员都忙碌?我尝试使用指南中的负载平衡器模式,但对于单个客户端,sock.recv()
将阻塞,直到它收到工作人员的响应。
这是代码,根据负载均衡器的 zmq 指南稍作修改。是在中间启动一个客户端、10 个工作人员和一个负载平衡器/代理。我怎样才能让所有这些 worker 同时工作???
from __future__ import print_function
from multiprocessing import Process
import zmq
import time
import uuid
import random
def client_task():
"""Basic request-reply client using REQ socket."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = str(uuid.uuid4())
socket.connect("ipc://frontend.ipc")
# Send request, get reply
for i in range(100):
print("SENDING: ", i)
socket.send('WORK')
msg = socket.recv()
print(msg)
def worker_task():
"""Worker task, using a REQ socket to do load-balancing."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = str(uuid.uuid4())
socket.connect("ipc://backend.ipc")
# Tell broker we're ready for work
socket.send(b"READY")
while True:
address, empty, request = socket.recv_multipart()
time.sleep(random.randint(1, 4))
socket.send_multipart([address, b"", b"OK : " + str(socket.identity)])
def broker():
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("ipc://frontend.ipc")
backend = context.socket(zmq.ROUTER)
backend.bind("ipc://backend.ipc")
# Initialize main loop state
workers = []
poller = zmq.Poller()
# Only poll for requests from backend until workers are available
poller.register(backend, zmq.POLLIN)
while True:
sockets = dict(poller.poll())
if backend in sockets:
# Handle worker activity on the backend
request = backend.recv_multipart()
worker, empty, client = request[:3]
if not workers:
# Poll for clients now that a worker is available
poller.register(frontend, zmq.POLLIN)
workers.append(worker)
if client != b"READY" and len(request) > 3:
# If client reply, send rest back to frontend
empty, reply = request[3:]
frontend.send_multipart([client, b"", reply])
if frontend in sockets:
# Get next client request, route to last-used worker
client, empty, request = frontend.recv_multipart()
worker = workers.pop(0)
backend.send_multipart([worker, b"", client, b"", request])
if not workers:
# Don't poll clients if no workers are available
poller.unregister(frontend)
# Clean up
backend.close()
frontend.close()
context.term()
def main():
NUM_CLIENTS = 1
NUM_WORKERS = 10
# Start background tasks
def start(task, *args):
process = Process(target=task, args=args)
process.start()
start(broker)
for i in range(NUM_CLIENTS):
start(client_task)
for i in range(NUM_WORKERS):
start(worker_task)
# Process(target=broker).start()
if __name__ == "__main__":
main()
最佳答案
我想有不同的方法可以做到这一点:
-例如,您可以使用 threading
模块从您的单个客户端启动所有请求,例如:
result_list = [] # Add the result to a list for the example
rlock = threading.RLock()
def client_thread(client_url, request, i):
context = zmq.Context.instance()
socket = context.socket(zmq.REQ)
socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i))
socket.connect(client_url)
socket.send(request.encode())
reply = socket.recv()
with rlock:
result_list.append((i, reply))
return
def client_task():
# tasks = list with all your tasks
url_client = "ipc://frontend.ipc"
threads = []
for i in range(len(tasks)):
thread = threading.Thread(target=client_thread,
args=(url_client, tasks[i], i,))
thread.start()
threads.append(thread)
-您可以利用像 asyncio
这样的事件库(有一个子模块 zmq.asyncio 和另一个库 aiozmq,最后一个提供更高的抽象层次)。在这种情况下,您也将按顺序向工作人员发送请求,但不会对每个响应进行阻塞(因此不会使主循环忙碌)并在他们返回主循环时获取结果。这可能看起来像这样:
import asyncio
import zmq.asyncio
async def client_async(request, context, i, client_url):
"""Basic client sending a request (REQ) to a ROUTER (the broker)"""
socket = context.socket(zmq.REQ)
socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i))
socket.connect(client_url)
await socket.send(request.encode())
reply = await socket.recv()
socket.close()
return reply
async def run(loop):
# tasks = list full of tasks
url_client = "ipc://frontend.ipc"
asyncio_tasks = []
ctx = zmq.asyncio.Context()
for i in range(len(tasks)):
task = asyncio.ensure_future(client_async(tasks[i], ctx, i, url_client))
asyncio_tasks.append(task)
responses = await asyncio.gather(*asyncio_tasks)
return responses
zmq.asyncio.install()
loop = asyncio.get_event_loop()
results = loop.run_until_complete(run(loop))
我没有测试这两个片段,但它们都来 self 使用 zmq 的代码(经过修改以适应问题),配置与您的问题类似。
关于python - ZeroMQ:负载平衡许多 worker 和一个主人,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39862022/