python - 多处理策略-共享嵌套对象

标签 python multiprocessing simulation

我正在研究一个项目,想应用并行化来提高执行速度。我以前使用过multiprocessing库,但仅用于数字运算。我将尝试简要描述我的设置和目标。我主要希望从对多处理概念有丰富经验的人们那里得到一个想法。

项目:

该项目是一个多echolon供应链仿真(多级分销网络),其中,根据传入的需求在每个位置定期做出重新订购决策。一个玩具示例如下所示:

  Level 3               Level 2                 Level 1             Level 0

                                         --- Local Warehouse 1
                                        |
             --- Central Warehouse 1 --
            |                           |
            |                            --- Local Warehouse 2
            |
Supplier --                                                        Customer
            |                            --- Local Warehouse 3
            |                           |
             --- Central Warehouse 2 --
                                        |
                                         --- Local Warehouse 4

模拟对象(简化)如下:
class Simulation:
  self.locations = dict() #List of locations
  self.customer = Customer() #Object periodically ordering at deepest level (Local) Warehouses
  self.levels = {0: [], 1:[],..} # Locations by depth in network graph
  def run(self):
    for period in simulation_length:
      for level in self.levels:
        for location in level:
          #review orders and issue order if required

class Location:
  self.orders = [] #list of received orders
  def review(self):
     #Decides based on received orders if reorder required
  def order(self, order, other_location):
       simulation.locations[other_location].orders.append(order)

因此该过程如下所示:
  • 客户(级别0)向本地仓库(级别1)发出订单
  • 本地仓库(级别1)检查订单并向中央仓库(级别2)发布订单
  • 依此类推,直到供应商
  • 下一个时期

  • 我的问题/想法

    现在,我拥有属于供应链特定级别的所有仓库的dict,并且我按顺序遍历了每个级别中的每个仓库(因此满足了依赖性)。
    级别数有限,但是每个级别的仓库数非常大,并且检查逻辑可能需要大量计算,因此我的计划是并行检查属于同一级别的所有仓库

    但是,由于某个位置使用函数order(self, order, other_location)来访问模拟对象内另一个对象的属性,因此我需要在进程之间共享整个模拟对象

    想法和方法:
  • 在下订单时将sumulation object放入shared memory中,并在对象上使用Lock(在评论中的所有其他操作纯粹是读取操作)
  • 而不是直接下达订单,而是将它们以Queue的形式放置到主流程中,并在返回某个层级后的所有仓库之后,只需执行订单功能(计算价格低廉)

  • (1)问题:

    根据我的所有研究,只能将CType对象ValueArray放在共享内存中。我不知道怎么办。我唯一读的是multiprocessing Manager,但是另一个stackoverflow问题Link说,它不适用于嵌套对象。

    (2)的问题:

    由于每个仓库对象在各个期间之间都发生变化(订单到达,库存更改,..),我必须将仓库对象移交给每个期间的流程,以使其处于最新状态,这会产生较大的间接费用(至少我认为是这样)

    结论

    我希望清楚我要达到的目标。对我这方面的任何误会的任何暗示,澄清或纠正都将是巨大的!

    编辑有关@ Roy12的回答:

    感谢您的回答。我一定会看一下Dask,因为最终目的是利用集群。
    关于第一个提示,我想到了两个实现,感谢您的建议:
    我的位置需要接收和发送订单对象,发送部分由对象本身控制,而不是接收对象。因此,对我来说选项1是
  • 在时段开始时,使用最新位置对象生成的进程会执行计算,而不是直接发送订单,而是将其放入队列并关闭进程。完成整个关卡后,主流程将分发订单并生成下一关卡的流程,依此类推。
    这导致定期生成和关闭过程,并且取决于模拟长度,位置对象变得相当大
  • 我将位置静态映射到开头的流程,并具有传入队列和传出队列,并让mainprocess进行订单的分发,例如流程1(位置1)向流程2(位置2)发送订单->流程1->主流程->流程2。在这种情况下,每次处理订单时都需要给该流程一个信号并执行例程(读取队列->重新计算->将订单发送到队列)

  • (2)对我来说似乎更老练,但是我没有缺点,否则必须对收集进行编程。如果重要的话,则订单对象的大小约为40字节,而位置对象(仓库)在整个运行过程中会增长到约15 mb

    最佳答案

    一个很好的用例。一些想法/建议:

  • 不要使用共享内存。这些天被认为是不好的做法。人们过去曾经使用共享内存进行并发,但是现代的方法是尽可能避免这种情况。例如,Go语言提供了一些不错的替代方法(请参阅https://blog.golang.org/codelab-share)。共享内存的另一个缺点是您不能在多台计算机上分配您的工作。
  • 使用队列通常更好。如果您要回退的数据以及过程之间的数据不是很大-许多(很多)兆字节-开销可以忽略不计。
  • 对于您的用例,您可能需要考虑使用诸如Dask之类的分布式计算框架。它提供了简单的方法来收集子任务的结果,然后才开始在层次结构中的下一个层次上工作。此外,它使您可以将工作分布在整个群集中,而不仅仅是一台机器上。

  • 希望这可以帮助。

    更新一些比例数据:

    该问题指出位置的大小为15MB,订单的大小为〜40个字节(小得多)。

    鉴于此,很明显,如果我们针对低网络流量进行优化,我们将选择模型1,其中每个位置都是整个模拟过程中一直存在的过程,并与其他位置进行通信以查看队列和消息。

    但是-很大,但是-通过队列运行所有通信似乎是一个更复杂的实现。创建具有15MB数据的进程应该花费不到一秒钟的时间。如果每个位置的计算都很重要,那么它可能比流程创建本身需要更多的时间。因此,我可能会从更简单的实现开始(为每个位置产生一个新的过程)。

    换句话说,围绕队列构建整个系统似乎有些过早的优化。

    最后一点:有一个名为SimPy的Python仿真包。我不知道它的可扩展性,但是值得一看。

    关于python - 多处理策略-共享嵌套对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61758267/

    相关文章:

    python - 如何在 Mac OS X 10.6 (Snow Leopard) 上安装 EasyGUI?

    module - 为什么多处理 Julia 会破坏我的模块导入?

    multithreading - 多核到单核仿真(核心分组)?

    python - 多处理列表管理器

    performance - 制作更高效的蒙特卡罗模拟

    Python 绘制多项式

    python - 更改 attrs 中卡住类的属性

    python - GPIB 上的仪器使用 PyVISA 无响应

    python - 为什么我的天文模拟不准确?

    python - Python中的分子动力学模拟