假设我有以下具有多个昂贵属性的对象,如下所示:
class Object:
def __init__(self, num):
self.num = num
@property
def expensive_property(self):
return expensive_calculation
@property
def expensive_property1(self):
return expensive_calculation
@property
def expensive_property2(self):
return expensive_calculation
注意:随着时间的推移,昂贵房产的数量可能会增加。
给定
Objects
的列表对于列表中的所有对象,我如何计算每个线程的每个昂贵属性。我很难弄清楚我应该如何安排我的游泳池。这有点像我想要实现的目标:
from multithreading.dummy import Pool
from multithreading.dummy import Queue
object_list = [Object(i) for i in range(20)]
properties = [expensive_property2, expensive_propert5, expensive_property9, expensive_property3]
def get(obj, expensive_property):
return [getattr(expensive_property, o) for o in obj]
tasks = Queue()
for p in properties :
tasks.put((get, o, p))
results = []
with Pool(len(properties )) as pool:
while True:
task = tasks.get()
if task is None:
break
func, *args = task
result = pool.apply_async(func, args)
results.append(result)
最佳答案
这有点疯狂,因为 apply_async
有一个 internal queue在池中分配任务。我可以想象有理由围绕可观察性或背压设置另一个队列。你的例子是你的完整程序吗?还是您正在从不同的进程/线程排队工作?
如果您的计算受 CPU 限制,则一种选择可能是删除队列以使事情变得更简单:
def wait_all(async_results, timeout_seconds_per_task=1):
for r in async_results:
r.get(timeout_seconds)
wait_all(
[pool.apply_async(get, (o, p)) for p in properties],
timeout_seconds_per_task=1,
)
就像上面的示例一样,这允许您在可用的 CPU 上分配计算(池甚至默认为 cpus on your machine 的数量)。如果您的工作受 IO 限制(由您的 sleep 建议),则进程的 yield 可能会递减。
您必须进行基准测试,但对于 IO 绑定(bind),您可以使用相同的模式创建线程池 https://stackoverflow.com/a/3034000/594589
其他选项可能是使用带有事件循环的非阻塞 IO,例如 gevent 或 asyncio。两者都允许您对相同的基于池的模式进行建模!
关于python-3.x - 如何使用 getattr 进行多线程,每个属性一个线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53715322/