python - JSON 序列化对象在多处理调用时出错 - TypeError : XXX objects not callable error

标签 python multiprocessing pickle jsonserializer jsonpickle

我正在使用 JSON 序列化器辅助函数来轻松访问字典(基本上作为 JSON 接收)对象。
jsondict.py

"""Utilities for working with JSON and json-like structures - deeply nested Python dicts and lists

This lets us iterate over child nodes and access elements with a dot-notation.
"""
import sys
isPy3 = sys.version_info[0]==3
if isPy3:
    def __alt_str__(v,enc='utf8'):
        return v if isinstance(v,bytes) else v.encode(enc)
    __strTypes__ = (str,bytes)
else:
    __alt_str__ = unicode
    __strTypes__ = (str,unicode)

class MyLocals(object):
    pass
mylocals = MyLocals()

def setErrorCollect(collect):
    mylocals.error_collect = collect

setErrorCollect(False)

def errorValue(x):
    if isinstance(x,__strTypes__):
         return repr(x) if ' ' in x else x
    return 'None' if x is None else str(x)
def condJSON(v,__name__=''):
    return JSONDict(v,__name__=__name__) if isinstance(v,dict) else JSONList(v,__name__=__name__) if isinstance(v,list) else v

def condJSONSafe(v,__name__=''):
    return JSONDictSafe(v,__name__=__name__) if isinstance(v,dict) else JSONListSafe(v,__name__=__name__) if isinstance(v,list) else v

class JSONListIter(object):
    def __init__(self, lst, conv):
        self.lst = lst
        self.i = -1
        self.conv = conv

    def __iter__(self):
        return self

    def next(self):
        if self.i<len(self.lst)-1:
            self.i += 1         
            return self.conv(self.lst[self.i])
        else:
            raise StopIteration

    if isPy3:
        __next__ = next
        del next

class JSONList(list):
    def __init__(self,v,__name__=''):
        list.__init__(self,v)
        self.__name__ = __name__
    def __getitem__(self,x):
        return condJSON(list.__getitem__(self,x),__name__='%s\t%s'%(self.__name__,errorValue(x)))
    def __iter__(self):
        return JSONListIter(self,condJSON)

class JSONListSafe(JSONList):
    def __getitem__(self,x):
        __name__='%s\t%s'%(self.__name__,errorValue(x))
        try:
            return condJSONSafe(list.__getitem__(self,x),__name__=__name__)
        except:
            if mylocals.error_collect:
                mylocals.error_collect(__name__)
            return JSONStrSafe('')
    def __iter__(self):
        return JSONListIter(self,condJSONSafe)

class JSONStrSafe(str):
    def __getattr__(self, attr):
        return self
    __getitem__ = __getattr__


class JSONDict(dict):
    "Allows dotted access"
    def __new__(cls,*args,**kwds):
        __name__ = kwds.pop('__name__')
        self = dict.__new__(cls,*args,**kwds)
        self.__name__ = __name__
        return self

    def __init__(self,*args,**kwds):
        kwds.pop('__name__','')
        dict.__init__(self,*args,**kwds)

    def __getattr__(self, attr, default=None):
        if attr in self:
            return condJSON(self[attr],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
        elif __alt_str__(attr) in self:
            return condJSON(self[__alt_str__(attr)],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
        elif attr=='__safe__':
            return JSONDictSafe(self,__name__=self.__name__)
        else:
            raise AttributeError("No attribute or key named '%s'" % attr)

    def sorted_items(self,accept=None, reject=lambda i: i[0]=='__name__'):
        if accept or reject:
            if not accept:
                f = lambda i: not reject(i)
            elif not reject:
                f = accept
            else: #both
                f = lambda i: accept(i) and not reject(i)
            return sorted(((k,condJSON(v,__name__==k)) for k,v in self.iteritems() if f((k,v))))
        else:
            return sorted(((k,condJSON(v,__name__==k)) for k,v in self.iteritems()))

    def sorted_keys(self):
        return sorted(self.keys())

class JSONDictSafe(JSONDict):
    "Allows dotted access"
    def __getattr__(self, attr, default=None):
        if attr in self:
            return condJSONSafe(self[attr],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
        elif __alt_str__(attr) in self:
            return condJSONSafe(self[__alt_str__(attr)],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
        elif attr=='__safe__':
            return self
        else:
            return JSONStrSafe('')

    def __getitem__(self,x):
        __name__='%s\t%s'%(self.__name__,errorValue(x))
        try:
            return condJSONSafe(dict.__getitem__(self,x),__name__=__name__)
        except KeyError:
            if mylocals.error_collect:
                mylocals.error_collect(__name__)
            return JSONStrSafe('')

    def sorted_items(self,accept=None, reject=lambda i: i[0]=='__name__'):
        if accept or reject:
            if not accept:
                f = lambda i: not reject(i)
            elif not reject:
                f = accept
            else: #both
                f = lambda i: accept(i) and not reject(i)
            return sorted(((k,condJSONSafe(v,__name__==k)) for k,v in self.iteritems() if f((k,v))))
        else:
            return sorted(((k,condJSONSafe(v,__name__==k)) for k,v in self.iteritems()))
如果 JSON 对象像下面一样传递。
data = {'name': 'john', 'age': 20, 'address': {'city':'xyz', 'country':'XZ', 'zip': 1223}}

json_obj = condJSONSafe(data)
我能够使用点符号访问数据。
print(json_obj.name) --> john
print(json_obj.address.country) --> XZ
在我在代码中实现多处理以提高性能之前,它运行良好。
我从 JSON 中提取了一定数量的数据(在使用上述辅助函数将其作为点符号可访问数据之后)并将其存储到单独的列表中,如列表 a、b、c。
然后,我进入了多处理线程,
with mp.Pool(processes=mp.cpu_count()) as pool:
    res = pool.starmap(self.process_records, zip(self.a, self.b, self.c))
pool.join()
结束了
TypeError: 'JSONStrSafe' object is not callable
我试过 this回答,但它对我不起作用。感谢您的帮助。提前致谢。
编辑:
重现示例:
测试文件
import jsondict
import multiprocessing as mp
import itertools

def process_records(data, metadata):
    print(data.name)
    print(metadata)
    #code to requirment


if __name__ == '__main__':
    data = {
        "metadata": "test_data",
        "cust_list": [
            {
                'name': 'john', 
                'age': 20, 
                'address': {
                    'city':'xyz', 
                    'country':'XZ', 
                    'zip': 1223
                }
            },
                {
                'name': 'michal', 
                'age': 25, 
                'address': {
                    'city':'abc', 
                    'country':'CX', 
                    'zip': 3435
                }
            },
                {
                'name': 'david', 
                'age': 30, 
                'address': {
                    'city':'mnl', 
                    'country':'TD', 
                    'zip': 6767
                }
            }
        ]
    }

    json_obj = jsondict.condJSONSafe(data)

    print(json_obj.metadata) #will print 'test_data'
    print(json_obj.cust_list[0].name) #will print 'john'
    print(json_obj.cust_list[2].address.city) #will print 'mnl'


    with mp.Pool(processes=mp.cpu_count()) as pool:
        res = pool.starmap(process_records, zip(json_obj.cust_list, itertools.repeat(json_obj.metadata))) # --> not working
        #res = pool.map(process_records, zip(json_obj.cust_list, itertools.repeat(json_obj.metadata))) --> not working
        #res = [pool.apply_async(process_records, d, json_obj.metadata) for d in json_obj.cust_list] --> not working
        #apply --> not working
    pool.join()
输出:
test_data
john
mnl
Traceback (most recent call last):
  File "c:/Users/mohanlal/Desktop/Mock/json_err/test_app.py", line 53, in <module>
    res = pool.starmap(process_records, zip(json_obj.cust_list, itertools.repeat(json_obj.metadata))) # --> not working
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 268, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 608, in get
    raise self._value
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 385, in _handle_tasks
    put(task)
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: 'JSONStrSafe' object is not callable
尝试使用 startmap、map、apply_async、apply,得到相同的错误。
我已经尝试使用上面附加链接的类似问题中给出的解决方案。在出现此错误的地方修改如下。
import re
dunder_pattern = re.compile("__.*__")
protected_pattern = re.compile("_.*")

classJSONStrSafe(str):
    def__getattr__(self, attr):
        if dunder_pattern.match(attr) or protected_pattern.match(attr):
            return super().__getattr__(attr)
        return self
def__getstate__(self): returnself.__dict__
def__setstate__(self, d): self.__dict__.update(d)

__getitem__ = __getattr__
但问题依然存在。
正如评论中所建议的,我在 的所有 3 个地方都进行了更改。 getattr 并尝试过。得到不同的错误如下
Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
    self.run()
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 108, in worker
    task = get()
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 345, in get
    return _ForkingPickler.loads(res)
  File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py", line 89, in __new__
    __name__ = kwds.pop('__name__')
Process SpawnPoolWorker-2:
Process SpawnPoolWorker-4:
Traceback (most recent call last):
Traceback (most recent call last):
KeyError: '__name__'
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
    self.run()
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 108, in worker
    task = get()
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 345, in get
    return _ForkingPickler.loads(res)
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
    self.run()
  File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py", line 89, in __new__
    __name__ = kwds.pop('__name__')
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
KeyError: '__name__'
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 108, in worker
    task = get()
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 345, in get
    return _ForkingPickler.loads(res)
  File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py", line 89, in __new__
    __name__ = kwds.pop('__name__')
KeyError: '__name__'

最佳答案

问题是你在“pickle ”中。原谅双关语——你有 pickle 问题。当您进行多处理时,您的工作函数/方法的参数会被 pickle 。通常,用于序列化和反序列化状态的默认值是可以的,但在您的情况下则不然。见 Pickling Class Instances .序列化和反序列化对象的默认保存和加载操作是:

def save(obj):
    return (obj.__class__, obj.__dict__)

def load(cls, attributes):
    obj = cls.__new__(cls)
    obj.__dict__.update(attributes)
    return obj
请注意,当反序列化对象时,对象的 __init__方法没有被调用,而是它的 __new__方法是,问题就在这里。我不得不修改你的__new__类的方法JSONDict尝试识别它是通过反序列化调用的,因此 '__name__'关键字参数中可能不存在,然后必须添加到自定义的类中 __getstate____setstate__覆盖它保存和恢复对象属性的默认方式的方法(方法 __init__ 保持不变):
class JSONDict(dict):
    "Allows dotted access"
    def __new__(cls,*args,**kwds):
        self = dict.__new__(cls,*args,**kwds)
        if kwds and '__name__' in kwds:
            __name__ = kwds.pop('__name__')
            self.__name__ = __name__
        return self

    def __init__(self,*args,**kwds):
        kwds.pop('__name__','')
        dict.__init__(self,*args,**kwds)

    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, d):
        self.__dict__ = d


    """ The other methods remain unmodified """
打印:
test_data
john
mnl
john
test_data
michal
david
test_data
test_data
更新
我正在挠头弄清楚为什么有必要提供 __getstate____setstate__ pickle 方法,因为无论如何它们都应该是默认操作。如果你修改程序只是为了测试 pickle ,甚至没有运行 Pool方法通过插入以下行:
json_obj = condJSONSafe(data)
# insert this line:
import pickle; print(pickle.dumps(json_obj)); sys.exit(0)
它打印:
Traceback (most recent call last):
  File "test.py", line 205, in <module>
    import pickle;  print('pickle'); print(pickle.dumps(json_obj)); sys.exit(0)
TypeError: 'JSONStrSafe' object is not callable
在正确的地方添加了print语句后,很明显问题出在__getattr__类的方法JSONDictSafe .当pickle检查类是否实现了方法 __getstate____setstate__ , 当没有实现时 __getattr__最终被调用并作为这些属性的默认值返回 JSONStrSafe实例。因此,与其像我所做的那样通过定义这些方法来提供这些属性,不如添加一个简单的检查,如下所示:
class JSONDictSafe(JSONDict):
    "Allows dotted access"
    def __getattr__(self, attr, default=None):
        if attr in ('__getstate__', '__setstate__'):
            raise AttributeError(f'Missing attribute: {attr}')
        """ rest of the method is unmodified """

关于python - JSON 序列化对象在多处理调用时出错 - TypeError : XXX objects not callable error,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65876256/

相关文章:

Python 在日期时间接近纪元的 Windows 上崩溃

Python 3 : How to write to the same file from multiple processes without messing it up?

python - 不和谐 python 机器人 : AttributeError: Can't pickle local object

python - 使用 asyncio 发出 100 个请求后,并行请求会无限阻塞

python - Python中可用和使用的系统内存?

python - 调用 python multiprocessing .join() 方法是否有成本

python-3.x - 没有多处理打印输出 (Spyder)

python - 当无法进行 pickle 时如何进行深层复制

python - EOFError 超出输入 Python

python - eBay SDK 无法识别沙盒用户帐户