python-3.x - 如何使用 getattr 进行多线程,每个属性一个线程?

标签 python-3.x multithreading threadpool getattr

假设我有以下具有多个昂贵属性的对象,如下所示:

class Object:

  def __init__(self, num):
    self.num = num

  @property 
  def expensive_property(self):
    return expensive_calculation

  @property 
  def expensive_property1(self):
    return expensive_calculation

  @property 
  def expensive_property2(self):
    return expensive_calculation

注意:随着时间的推移,昂贵房产的数量可能会增加。
给定 Objects 的列表对于列表中的所有对象,我如何计算每个线程的每个昂贵属性。我很难弄清楚我应该如何安排我的游泳池。

这有点像我想要实现的目标:
from multithreading.dummy import Pool
from multithreading.dummy import Queue

object_list = [Object(i) for i in range(20)]
properties = [expensive_property2, expensive_propert5, expensive_property9, expensive_property3]


def get(obj, expensive_property):
  return [getattr(expensive_property, o) for o in obj]

tasks = Queue()
for p in properties :
  tasks.put((get, o, p))

results = []

with Pool(len(properties )) as pool:
   while True:
      task = tasks.get()
      if task is None:
        break
      func, *args = task
      result = pool.apply_async(func, args)
      results.append(result)

最佳答案

这有点疯狂,因为 apply_async有一个 internal queue在池中分配任务。我可以想象有理由围绕可观察性或背压设置另一个队列。你的例子是你的完整程序吗?还是您正在从不同的进程/线程排队工作?

如果您的计算受 CPU 限制,则一种选择可能是删除队列以使事情变得更简单:

def wait_all(async_results, timeout_seconds_per_task=1):
   for r in async_results:
      r.get(timeout_seconds)

wait_all(
  [pool.apply_async(get, (o, p)) for p in properties],
  timeout_seconds_per_task=1,
)

就像上面的示例一样,这允许您在可用的 CPU 上分配计算(池甚至默认为 cpus on your machine 的数量)。如果您的工作受 IO 限制(由您的 sleep 建议),则进程的 yield 可能会递减。

您必须进行基准测试,但对于 IO 绑定(bind),您可以使用相同的模式创建线程池 https://stackoverflow.com/a/3034000/594589

其他选项可能是使用带有事件循环的非阻塞 IO,例如 gevent 或 asyncio。两者都允许您对相同的基于池的模式进行建模!

关于python-3.x - 如何使用 getattr 进行多线程,每个属性一个线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53715322/

相关文章:

有界队列的 Java 线程池

python - 如何替换参数给定的字符串中的特定元素

C++ 线程安全延迟加载

python - 为什么 Python 和 wc 在字节数上不一致?

java - 蛇中的线程

java - 使用 websockets 的多线程

java - 等待 FutureTask 上的 cancel()

java - 如何强制执行单例(一个全局实例)ScheduledExecutorService,一次一项任务?

python - Python 3 中 Pyspark 的 takeOrdered 键错误

python - 如何在 5 分钟内使 Django session 过期?