python多处理池将对象分配给worker

标签 python

我有一些对象需要处理。我想知道是否有办法根据唯一键将工作(流程)分配给对象。
当代码第一次看到对象时,应该随机分配一个worker,但是如果该对象再次出现,则应该分配给之前处理该对象的worker。谢谢

例如:
worker A、B、C |第一束对象 1,2,3,4 第二束对象 1,3
第一堆对象:
worker A <--- 1,3
worker B <--- 2
worker C <--- 4
第二堆对象:
worker A <--- 1,3
worker B <---
worker C<---

最佳答案

实现“粘性 session ”的一个非常简单的方法是制作您自己的 multiprocessing.Pool 版本,它不会急切地分配工作项,而是确定性地分配它们。这是一个不完整但可运行的解决方案:

import multiprocessing
import os
import time

def work(job):
    time.sleep(1)
    print "I am process", os.getpid(), "processing job", job

class StickyPool:
    def __init__(self, processes):
        self._inqueues = [multiprocessing.Queue() for ii in range(processes)]
        self._pool = [multiprocessing.Process(target=self._run, args=(self._inqueues[ii],)) for ii in range(processes)]
        for process in self._pool:
            process.start()

    def map(self, fn, args):
        for arg in args:
            ii = hash(arg) % len(self._inqueues)
            self._inqueues[ii].put((fn, arg))

    def _run(self, queue):
        while True:
            fn, arg = queue.get()
            fn(arg)

pool = StickyPool(3)
#pool = multiprocessing.Pool(3)                                                                                         

pool.map(work, [1,2,3,4,1,2,3,4,1,2,3,4])
time.sleep(4)

当使用上面的StickyPool时,作业是根据其参数的哈希值来分配的。这意味着相同的参数每次都会进入相同的进程。如果有许多唯一值的哈希值发生冲突,那么它就不够聪明,无法均匀地分配作业,但是哦,好吧—— future 还有改进的空间。我也没有考虑关闭逻辑,因此如果您使用 StickyPool,程序不会停止运行,但如果您使用 multiprocessing.Pool,程序就会停止运行。修复这些问题并实现更多的 Pool 接口(interface)(如 apply() 和从 map() 返回结果)将作为练习.

关于python多处理池将对象分配给worker,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37164986/

相关文章:

python - 无法用 pickle 序列化 pygame.Surface 对象

python - 巨大的排列对象集(Python 或 R 中)

Python Beautifulsoup 获取属性值

python - 如何从 Django GeoIP 中的纬度和经度获取地址?

python - python 嵌套类变量

python - boto3 wait_until_running 不能按预期工作

python - Spark 中的分组和标准化

python - 如果满足条件,从数组中减去一个数字python

python - 如何遍历键为对的字典?

python - 在Python中将字符串转换为二进制整数