python - 如何使内置容器(集合、字典、列表)线程安全?

标签 python multithreading thread-safety

我了解 from this question如果我想拥有一个线程安全的 set 我必须自己实现线程安全部分。

因此我可以想出:

from threading import Lock

class LockedSet(set):
    """A set where add() and remove() are thread-safe"""

    def __init__(self, *args, **kwargs):
        # Create a lock
        self._lock = Lock()
        # Call the original __init__
        super(LockedSet, self).__init__(*args, **kwargs)

    def add(self, elem):
        self._lock.acquire()
        try:
            super(LockedSet, self).add(elem)
        finally:
            self._lock.release()

    def remove(self, elem):
        self._lock.acquire()
        try:
            super(LockedSet, self).remove(elem)
        finally:
            self._lock.release()

所以,当然只有 add() 和 remove() 在这个实现中是线程安全的。其他方法不是因为它们没有在子类中被覆盖。

现在,模式很简单:获取锁,调用原始方法,释​​放锁。 如果我遵循上述逻辑,我将不得不以基本相同的方式覆盖 set 公开的所有方法,例如:

(伪代码)

def <method>(<args>):
    1. acquire lock
    2. try:
    3.     call original method passing <args>
    4. finally:
    5.     release lock

(/伪代码)

这不仅乏味而且容易出错。那么,关于如何以更好的方式解决此问题的任何想法/建议?

最佳答案

您可以使用 Python 的元编程工具来完成此操作。 (注意:写得很快,没有经过全面测试。)我更喜欢使用类装饰器。

我也认为您可能需要锁定比 addremove 更多的东西才能使 set 线程安全,但我不是当然。我会忽略这个问题,只专注于你的问题。

还要考虑委托(delegate)(代理)是否比子类化更合适。包装对象是 Python 中常用的方法。

最后,没有元编程的“魔杖”可以神奇地为任何可变 Python 集合添加细粒度锁定。最安全的做法是使用 RLock 锁定 any 方法或属性访问,但这是非常粗粒度且缓慢的,并且可能仍然不能保证您的对象将被在所有情况下都是线程安全的。 (例如,您可能有一个集合,该集合操作其他线程可访问的另一个非线程安全对象。)您确实需要检查每个数据结构并考虑哪些操作是原子操作或需要锁以及哪些方法可能调用其他方法使用相同的锁(即死锁本身)。

也就是说,您可以按照抽象的递增顺序使用以下技术:

代表团

class LockProxy(object):
    def __init__(self, obj):
        self.__obj = obj
        self.__lock = RLock()
        # RLock because object methods may call own methods
    def __getattr__(self, name):
        def wrapped(*a, **k):
            with self.__lock:
                getattr(self.__obj, name)(*a, **k)
        return wrapped

lockedset = LockProxy(set([1,2,3]))

上下文管理器

class LockedSet(set):
    """A set where add(), remove(), and 'in' operator are thread-safe"""

    def __init__(self, *args, **kwargs):
        self._lock = Lock()
        super(LockedSet, self).__init__(*args, **kwargs)

    def add(self, elem):
        with self._lock:
            super(LockedSet, self).add(elem)

    def remove(self, elem):
        with self._lock:
            super(LockedSet, self).remove(elem)

    def __contains__(self, elem):
        with self._lock:
            super(LockedSet, self).__contains__(elem)

装饰器

def locked_method(method):
    """Method decorator. Requires a lock object at self._lock"""
    def newmethod(self, *args, **kwargs):
        with self._lock:
            return method(self, *args, **kwargs)
    return newmethod

class DecoratorLockedSet(set):
    def __init__(self, *args, **kwargs):
        self._lock = Lock()
        super(DecoratorLockedSet, self).__init__(*args, **kwargs)

    @locked_method
    def add(self, *args, **kwargs):
        return super(DecoratorLockedSet, self).add(elem)

    @locked_method
    def remove(self, *args, **kwargs):
        return super(DecoratorLockedSet, self).remove(elem)

类装饰器

我认为这是抽象方法中最简洁、最容易理解的方法,因此我对其进行了扩展,以允许指定要锁定的方法和锁定对象工厂。

def lock_class(methodnames, lockfactory):
    return lambda cls: make_threadsafe(cls, methodnames, lockfactory)

def lock_method(method):
    if getattr(method, '__is_locked', False):
        raise TypeError("Method %r is already locked!" % method)
    def locked_method(self, *arg, **kwarg):
        with self._lock:
            return method(self, *arg, **kwarg)
    locked_method.__name__ = '%s(%s)' % ('lock_method', method.__name__)
    locked_method.__is_locked = True
    return locked_method


def make_threadsafe(cls, methodnames, lockfactory):
    init = cls.__init__
    def newinit(self, *arg, **kwarg):
        init(self, *arg, **kwarg)
        self._lock = lockfactory()
    cls.__init__ = newinit

    for methodname in methodnames:
        oldmethod = getattr(cls, methodname)
        newmethod = lock_method(oldmethod)
        setattr(cls, methodname, newmethod)

    return cls


@lock_class(['add','remove'], Lock)
class ClassDecoratorLockedSet(set):
    @lock_method # if you double-lock a method, a TypeError is raised
    def frobnify(self):
        pass

使用 __getattribute__

覆盖属性访问
class AttrLockedSet(set):
    def __init__(self, *args, **kwargs):
        self._lock = Lock()
        super(AttrLockedSet, self).__init__(*args, **kwargs)

    def __getattribute__(self, name):
        if name in ['add','remove']:
            # note: makes a new callable object "lockedmethod" on every call
            # best to add a layer of memoization
            lock = self._lock
            def lockedmethod(*args, **kwargs):
                with lock:
                    return super(AttrLockedSet, self).__getattribute__(name)(*args, **kwargs)
            return lockedmethod
        else:
            return super(AttrLockedSet, self).__getattribute__(name)

使用 __new__

动态添加包装器方法
class NewLockedSet(set):
    def __new__(cls, *args, **kwargs):
        # modify the class by adding new unbound methods
        # you could also attach a single __getattribute__ like above
        for membername in ['add', 'remove']:
            def scoper(membername=membername):
                # You can also return the function or use a class
                def lockedmethod(self, *args, **kwargs):
                    with self._lock:
                        m = getattr(super(NewLockedSet, self), membername)
                        return m(*args, **kwargs)
                lockedmethod.__name__ = membername
                setattr(cls, membername, lockedmethod)
        self = super(NewLockedSet, cls).__new__(cls, *args, **kwargs)
        self._lock = Lock()
        return self

使用 __metaclass__

动态添加包装器方法
def _lockname(classname):
    return '_%s__%s' % (classname, 'lock')

class LockedClass(type):
    def __new__(mcls, name, bases, dict_):
        # we'll bind these after we add the methods
        cls = None
        def lockmethodfactory(methodname, lockattr):
            def lockedmethod(self, *args, **kwargs):
                with getattr(self, lockattr):
                    m = getattr(super(cls, self), methodname)
                    return m(*args,**kwargs)
            lockedmethod.__name__ = methodname
            return lockedmethod
        lockattr = _lockname(name)
        for methodname in ['add','remove']:
            dict_[methodname] = lockmethodfactory(methodname, lockattr)
        cls = type.__new__(mcls, name, bases, dict_)
        return cls

    def __call__(self, *args, **kwargs):
        #self is a class--i.e. an "instance" of the LockedClass type
        instance = super(LockedClass, self).__call__(*args, **kwargs)
        setattr(instance, _lockname(self.__name__), Lock())
        return instance



class MetaLockedSet(set):
    __metaclass__ = LockedClass

动态创建的元类

def LockedClassMetaFactory(wrapmethods):
    class LockedClass(type):
        def __new__(mcls, name, bases, dict_):
            # we'll bind these after we add the methods
            cls = None
            def lockmethodfactory(methodname, lockattr):
                def lockedmethod(self, *args, **kwargs):
                    with getattr(self, lockattr):
                        m = getattr(super(cls, self), methodname)
                        return m(*args,**kwargs)
                lockedmethod.__name__ = methodname
                return lockedmethod
            lockattr = _lockname(name)
            for methodname in wrapmethods:
                dict_[methodname] = lockmethodfactory(methodname, lockattr)
            cls = type.__new__(mcls, name, bases, dict_)
            return cls

        def __call__(self, *args, **kwargs):
            #self is a class--i.e. an "instance" of the LockedClass type
            instance = super(LockedClass, self).__call__(*args, **kwargs)
            setattr(instance, _lockname(self.__name__), Lock())
            return instance
    return LockedClass

class MetaFactoryLockedSet(set):
    __metaclass__ = LockedClassMetaFactory(['add','remove'])

我敢打赌,使用简单明确的 try...finally 现在看起来还不错,对吧?

读者练习:让调用者使用这些方法中的任何一种方法传入他们自己的 Lock() 对象(依赖注入(inject))。

关于python - 如何使内置容器(集合、字典、列表)线程安全?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13610654/

相关文章:

c++ - 为什么子类对象调用母类的成员函数?

ruby 线程 : how to join threads from different places so that they are concurrent?

java - 在 EJB 程序中使用并发集合

java - ArrayBlockingQueue : access to specific element

python - IDLE自动挤占大量线路

python - 将列表转换为元组 python

python - 您传递给模型的 Numpy 数组列表不是模型预期的大小。预计会看到 2 个数组

python 语法错误 无效语法

java - 如何将队列从另一个线程添加到一个线程?

c# - 比较和交换有什么用?