python - 关闭管理器错误 "AttributeError: ' ForkAwareLocal' object has no attribute 'connection' "when using namespace and shared memory dict

标签 python python-3.x multiprocessing

我正在尝试:

  1. 在进程之间共享数据框
  2. 根据对该数据框执行的计算(但不更改)更新共享字典

我正在使用 multiprocessing.Manager() 在共享内存中创建一个 dict(存储结果)和一个 Namespace 来存储/分享我想要读取的数据框。

import multiprocessing

import pandas as pd
import numpy as np


def add_empty_dfs_to_shared_dict(shared_dict, key):
    shared_dict[key] = pd.DataFrame()


def edit_df_in_shared_dict(shared_dict, namespace, ind):
    row_to_insert = namespace.df.loc[ind]
    df = shared_dict[ind]
    df[ind] = row_to_insert
    shared_dict[ind] = df


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    shared_dict = manager.dict()
    namespace = manager.Namespace()

    n = 100
    dataframe_to_be_shared = pd.DataFrame({
        'player_id': list(range(n)),
        'data': np.random.random(n),
    }).set_index('player_id')

    namespace.df = dataframe_to_be_shared

    for i in range(n):
        add_empty_dfs_to_shared_dict(shared_dict, i)

    jobs = []
    for i in range(n):
        p = multiprocessing.Process(
            target=edit_df_in_shared_dict,
            args=(shared_dict, namespace, i)
        )
        jobs.append(p)
        p.start()

    for p in jobs:
        p.join()

    print(shared_dict[1])

运行上面的代码时,它会正确地写入 shared_dict,因为我的 print 语句使用一些数据执行。我还收到有关经理的错误消息:

Process Process-88:
Traceback (most recent call last):
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 788, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/henrysorsky/Library/Preferences/PyCharm2019.2/scratches/scratch_13.py", line 34, in edit_df_in_shared_dict
    row_to_insert = namespace.df.loc[ind]
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 1099, in __getattr__
    return callmethod('__getattribute__', (key,))
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 792, in _callmethod
    self._connect()
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 779, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/connection.py", line 492, in Client
    c = SocketClient(address)
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/connection.py", line 619, in SocketClient
    s.connect(address)
ConnectionRefusedError: [Errno 61] Connection refused

我知道这是管理器发出的,似乎是因为它没有正常关闭。我能在网上找到的唯一类似问题:

Share list between process in python server

建议加入我已经在做的所有子进程。

最佳答案

所以在一整夜的 sleep 之后,我意识到实际上是读取共享内存中的数据帧导致了问题,并且在大约第 20 个子进程中,其中一些读取失败。我添加了一次运行的最大进程数,这解决了这个问题。

对于任何想知道的人,我使用的代码是:

import multiprocessing

import pandas as pd
import numpy as np

def add_empty_dfs_to_shared_dict(shared_dict, key):
    shared_dict[key] = pd.DataFrame()


def edit_df_in_shared_dict(shared_dict, namespace, ind):
    row_to_insert = namespace.df.loc[ind]
    df = shared_dict[ind]
    df[ind] = row_to_insert
    shared_dict[ind] = df


if __name__ == '__main__':
    # region define inputs

    max_jobs_running = 4
    n = 100

    # endregion

    manager = multiprocessing.Manager()
    shared_dict = manager.dict()
    namespace = manager.Namespace()

    dataframe_to_be_shared = pd.DataFrame({
        'player_id': list(range(n)),
        'data': np.random.random(n),
    }).set_index('player_id')

    namespace.df = dataframe_to_be_shared

    for i in range(n):
        add_empty_dfs_to_shared_dict(shared_dict, i)

    jobs = []
    jobs_running = 0
    for i in range(n):
        p = multiprocessing.Process(
            target=edit_df_in_shared_dict,
            args=(shared_dict, namespace, i)
        )
        jobs.append(p)
        p.start()

        jobs_running += 1

        if jobs_running >= max_jobs_running:
            while jobs_running >= max_jobs_running:
                jobs_running = 0
                for p in jobs:
                    jobs_running += p.is_alive()

    for p in jobs:
        p.join()

    for key, value in shared_dict.items():
        print(f"key: {key}")
        print(f"value: {value}")
        print("-" * 50)

这可能通过 QueuePool 设置而不是我的 hacky 修复来更好地处理。

关于python - 关闭管理器错误 "AttributeError: ' ForkAwareLocal' object has no attribute 'connection' "when using namespace and shared memory dict,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60049527/

相关文章:

Python多处理队列获取和放置

python - 在Python中为图像添加灯光效果

python - 来自两个字符串列表的列表的组合

python multiprocessing - 进程挂起加入大队列

python - 使用summary_out时将回归结果导出为csv文件

python - Python 3 中 execfile 的替代方案?

python - 如何让多处理 python 应用程序干净地退出

python - 如何摆脱轮廓线以外的骨架线?

Python/tox,在测试前启动一个进程,完成后关闭它

python - python 中的编号列表