我有一些对象需要处理。我想知道是否有办法根据唯一键将工作(流程)分配给对象。
当代码第一次看到对象时,应该随机分配一个worker,但是如果该对象再次出现,则应该分配给之前处理该对象的worker。谢谢
例如:
worker A、B、C |第一束对象 1,2,3,4 第二束对象 1,3
第一堆对象:
worker A <--- 1,3
worker B <--- 2
worker C <--- 4
第二堆对象:
worker A <--- 1,3
worker B <---
worker C<---
最佳答案
实现“粘性 session ”的一个非常简单的方法是制作您自己的 multiprocessing.Pool
版本,它不会急切地分配工作项,而是确定性地分配它们。这是一个不完整但可运行的解决方案:
import multiprocessing
import os
import time
def work(job):
time.sleep(1)
print "I am process", os.getpid(), "processing job", job
class StickyPool:
def __init__(self, processes):
self._inqueues = [multiprocessing.Queue() for ii in range(processes)]
self._pool = [multiprocessing.Process(target=self._run, args=(self._inqueues[ii],)) for ii in range(processes)]
for process in self._pool:
process.start()
def map(self, fn, args):
for arg in args:
ii = hash(arg) % len(self._inqueues)
self._inqueues[ii].put((fn, arg))
def _run(self, queue):
while True:
fn, arg = queue.get()
fn(arg)
pool = StickyPool(3)
#pool = multiprocessing.Pool(3)
pool.map(work, [1,2,3,4,1,2,3,4,1,2,3,4])
time.sleep(4)
当使用上面的StickyPool
时,作业是根据其参数的哈希值来分配的。这意味着相同的参数每次都会进入相同的进程。如果有许多唯一值的哈希值发生冲突,那么它就不够聪明,无法均匀地分配作业,但是哦,好吧—— future 还有改进的空间。我也没有考虑关闭逻辑,因此如果您使用 StickyPool
,程序不会停止运行,但如果您使用 multiprocessing.Pool
,程序就会停止运行。修复这些问题并实现更多的 Pool
接口(interface)(如 apply()
和从 map()
返回结果)将作为练习.
关于python多处理池将对象分配给worker,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37164986/