python - 在多处理中使用共享列表的正确方法是什么

标签 python multiprocessing python-3.7

我已经实现了 共享列表 Manager, Lock 的帮助下,在 Python(3.7 版)中的多处理。我已将其用作使用多处理创建的进程之间的共享对象 Process函数调用。共享列表用于存储共享它的每个进程生成的值/对象。

的实现共享列表 ManagerLockmultiprocessing Python的

class SharedList(object):
    def __init__(self, limit):
        self.manager = Manager()
        self.results = self.manager.list([])
        self.lock = Lock()
        self.limit = limit

    def append(self, new_value):
        with self.lock:
            if len(self.results) == self.limit:
                return False
            self.results.append(new_value)
            return True

    def list(self):
        with self.lock:
            return list(self.results).copy()

创建的用法 共享列表 通过使用 multiprocessing 创建的多个进程来存储值
results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
   new_process = Process(target=child_function, args=(results))
   processes.append(new_process)
   new_process.start()

for _process in processes:
   _process.join()

for _process in processes:
   _process.close()
child_function的实现
while True:
  result = func()
  if not (results.append(result)):
     break

某些场景的实现有效,但是当我增加了限制时挂断了。
我用过的处理器数量少于CPU数量,做了同样的实验仍然卡在同一个位置。

有没有更好的方法来解决上述问题,我研究了不同的方法,例如使用队列,但没有按预期工作,挂断电话?

添加了使用队列的先前实现

使用队列实现
results_out = []
manager = multiprocessing.Manager()
results = manager.Queue()
tasks = manager.Queue()
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
    new_process = multiprocessing.Process(target=child_function,
                                            args=(tasks, results)
    processes.append(new_process)
    new_process.start()

sleep(5)
for i in range(limit):
    tasks.put(0)
sleep(1)

for i in range(num_processes):
    tasks.put(-1)

num_finished_processes = 0
while True:
    new_result = results.get()
    if new_result == -1:
        num_finished_processes += 1
        if num_finished_processes == num_processes:
            break
    else:
        results_out.append(new_result)

for process in processes:
    process.join()

for process in processes:
    process.close()

child_function
while True:
    task_val = tasks.get()
    if task_val < 0:
        results.put(-1)
        break
    else:
        result = func()
        results.put(result)

更新

在发布这个问题之前,我已经阅读了以下引用资料,但我无法获得所需的输出。我同意,这段代码导致了死锁状态,但我无法在 python 中使用多处理找到没有死锁的实现

引用文献
  • Multiprocessing of shared list
  • https://pymotw.com/2/multiprocessing/basics.html
  • Shared variable in python's multiprocessing
  • https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing
  • https://medium.com/@urban_institute/using-multiprocessing-to-make-python-code-faster-23ea5ef996ba
  • http://kmdouglass.github.io/posts/learning-pythons-multiprocessing-module/
  • python multiprocessing/threading cleanup

  • 根据建议,我能够修改 共享列表 使用 Queue
    class SharedList(object):
        def __init__(self, limit):
            self.manager = Manager()
            self.tasks = self.manager.Queue()
            self.results = self.manager.Queue()
            self.limit = limit
            self.no_of_process = min(process_count, limit)
    
        def setup(self):
            sleep(1)
            for i in range(self.limit):
                self.tasks.put(0)
            sleep(1)
            for i in range(self.no_of_process):
                self.tasks.put(-1)
    
        def append(self, new_value):
            task_val = self.tasks.get()
            if task_val < 0:
                self.results.put(-1)
                return False
            else:
                self.results.put(new_value)
                return True
    
        def list(self):
            results_out = []
            num_finished_processes = 0
            while True:
                new_result = self.results.get()
                if new_result == -1:
                    num_finished_processes += 1
                    if num_finished_processes == self.no_of_process:
                        break
                else:
                    results_out.append(new_result)
            return results_out
    

    此实现工作正常,具有以下实现更改
    results = SharedList(limit)
    num_processes = min(process_count, limit)
    processes = []
    for i in range(num_processes):
       new_process = Process(target=child_function, args=(results))
       processes.append(new_process)
       new_process.start()
    
    results.setup()
    
    for _process in processes:
       _process.join()
    
    for _process in processes:
       _process.close()
    
    child_function的实现
    while True:
      result = func()
      if not (results.append(result)):
         break
    

    但是,这又一次陷入僵局,在经过一些迭代后挂断了

    最佳答案

    我发现以下文章基于 ,听起来很有趣,而且很容易实现并行计算,既有效又省时

    https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8

    关于python - 在多处理中使用共享列表的正确方法是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58927768/

    相关文章:

    python - 如何让 PySerial 接受 921600 波特率

    用于输出可由音频程序读取的 MIDI 文件或文本的 Python 接口(interface)

    python - 为什么在导入多处理时出现导入错误?

    python-3.7 - vtkCommonCorePython 在 Ubuntu 20 上的 python3-paraview 中丢失错误

    Python 正则表达式 - 替换

    Python dataclasses.dataclass 引用变量而不是实例变量

    python - 如何加快多处理队列的同时读写速度?

    python-3.x - Jupyter 笔记本永远不会使用多重处理完成处理(Python 3)

    python - 如何使用 Python 3.7 和 Anaconda 运行 Spyder

    python - 使用 asyncio 生成器和 asyncio.as_completed