python - 在线程内调用 condition.wait() 会导致检索任何 future 以阻塞主线程

标签 python multithreading threadpool wait future

我有在共享可重入读写锁的线程池中执行的任务。如果执行完成,这些任务将返回 future 。当锁发生争用时,可重入读写锁将等待一个条件。

我使用的库公开了 wait_for_any从一组任务中检索一个或多个完成的 future 的方法。然而,即使一个或多个 future 已经完成wait_for_any在所有 future 完成之前,方法将无法返回。此外,wait_for_any方法公开一个超时参数,如果设置该参数随后将被忽略。

我的问题是我做错了什么会导致这样的 wait_for_any阻止的方法?我是否理解 Python 对条件等待和通知的实现不正确,这些构造是否会完全阻塞 Python 中的每个线程?

我使用的库称为 Futurist,由 OpenStack 基金会维护。以下是我使用的相关类和方法的链接: GreenThreadPoolExecutor waiters.wait_for_any

这是 ReentrantReadWriteLock:

class ReentrantReadWriteLock(object):
    def __init__(self):

        self._read_lock = RLock()
        self._write_lock = RLock()
        self._condition = Condition
        self._num_readers = 0
        self._wants_write = False

    def read_acquire(self, blocking=True):
        int_lock = False
        try:
            if self._read_lock.acquire(blocking):
                int_lock = True
                LOG.warning("read internal lock acquired")
                while self._wants_write:
                    LOG.warning("read wants write true")
                    if not blocking:
                        LOG.warning("read non blocking")
                        return False
                    LOG.warning("read wait")
                    with self._condition:
                        self._condition.wait()
                    first_it = False
                LOG.warning("read acquired lock")
                self._num_readers += 1
                return True
            LOG.warning("read internal lock failed")
            return False
        finally:
            if int_lock:
                 self._read_lock.release()

    def write_acquire(self, blocking=True):
        int_lock = False
        try:
            if self._write_lock.acquire(blocking):
                int_lock = True
                LOG.warning("write internal lock acquired")
                while self._num_readers > 0 or self._wants_write:
                    LOG.warning("write wants write true or num read")
                    if not blocking:
                        LOG.warning("write non blocking")
                        return False
                    LOG.warning("write wait")
                    with self._condition:
                        self._condition.wait()
                    first_it = False
                LOG.warning("write acquired lock")
                self._wants_write = True
                return True
            LOG.warning("write internal lock failed")
            return False
        finally:
            if int_lock:
                self._write_lock.release()

为了测试锁并为它无限期地阻塞,我执行以下操作:

def get_read(self, rrwlock):
    return rrwlock.read_acquire()

def get_write(self, rrwlock):
    return rrwlock.write_acquire()

def test():
    self._threadpool = futurist.GreenThreadPoolExecutor(max_workers=4)
    rrwlock = ReentrantReadWriteLock()
    futures = []
    futures.append(self._threadpool.submit(self.get_read, rrwlock))
    futures.append(self._threadpool.submit(self.get_write, rrwlock))

    # Get the results and verify only one of the calls succeeded
    # assert that the other call is still pending
    results = waiters.wait_for_any(futures)
    self.assertTrue(results[0].pop().result)
    self.assertEqual(1, len(results[1]))

在示例中执行 results = waiters.wait_for_any(futures)无限期地阻止。这让我彻底糊涂了。我希望有人可以向我解释这种行为。

更新 2019-10-16 18:55:00 UTC :
主线程的阻塞不限于这个 ReentrantReadWriteLock
实现,但在使用诸如 readerwriterlock 之类的库时也会发生.

更新 2019-10-17 08:15:00 UTC
我已将此作为错误报告提交给启动板上的 futurist 维护者,因为我认为这种行为是不正确的:launchpad bug report

更新 2019-10-20 09:02:00 UTC
从那以后,我观察到 future 主义图书馆进度被阻止的调用:waiter.event.wait(timeout)
一个类似的问题似乎被提交到 Python 3.3 和 3.4 并已被关闭:closed issue

更新 2019-10-21 09:06:00 UTC
已提交 future 主义库的补丁以尝试解决 this issue .

更新 2019-10-22 08:03:00 UTC
提交的补丁没有解决问题。追踪时waiter.event.wait(timeout)调用 waiter.acquire() 时 Python threading.py 等待函数中的调用块.

更新 2019-10-23 07:17:00 UTC
我创建了一个 small repository这表明使用 native ThreadPoolExecutor 和 future 可以实现这一点。我开始怀疑这是由 GIL 引起的 CPython 中的限制。下面的代码演示了
使用与上图相同的锁进行演示的操作:
from rrwlock import ReentrantReadWriteLock
from concurrent.futures import ThreadPoolExecutor

def read_lock(lock):
    lock.read_acquire()

def write_lock(lock):
    lock.write_acquire()

def main():
    local_lock = ReentrantReadWriteLock()
    with ThreadPoolExecutor(max_workers=2) as executor:
        # First task will submit fine
        future = executor.submit(read_lock, local_lock)
        # Second one will block indefinitely
        future2 = executor.submit(write_lock, local_lock)

更新 2019-10-31 07:36:00 UTC
可重入读写锁已更新,可用于 Python
2.7 并且与 demo repository on Github 中的内容保持一致.

此外,已经发现 2019-10-23 中描述的 native 线程池演示不起作用,因为与最后一条语句一起
future2 = executor.submit(write_lock, local_lock)
__exit__线程池的方法将被调用。自然地,这个方法试图干净地关闭所有当前运行的线程,这是由于持有锁而无法实现的。该示例已更新为 spin_for_any 示例:
futures = []
futures.append(executor.submit(read_lock, local_lock))
futures.append(executor.submit(write_lock, local_lock))

# This will loop indefinitely as one future will
# never be done but it shouldn't block.
# although similar to waiters.wait_for_any this
# would rather be 'spin_for_any' since it does
# not use wait().
while len(futures) > 0:
    for f in futures:
        if f.done():
            futures.remove(f)
            f.result()
            print("Future done")

这个原生 Python 并发 spin_for_any 示例完全按预期工作。

最佳答案

在您的 ReentrantReadWriteLock类,尝试改变

self._condition = Condition()

关于python - 在线程内调用 condition.wait() 会导致检索任何 future 以阻塞主线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58410610/

相关文章:

asp.net - MVC Ajax请求不阻塞主线程

JavaFX 停止线程

multithreading - 使用 .NETs ThreadPool 时 Fps 下降

c# - 如何使用线程池和 Entity Framework 处理数据库操作?

python - 如何在 SQLAlchemy 中连接两个查询?

python - 检查 RDD 中是否存在值

python - 如何找到整数 1,2,3 加起来等于 n 的方式数?

python - C 代码在尝试通过 xmlrpc 进行 Python 远程过程调用时崩溃

java - 多线程JAVA中的静态方法行为

java - 如何在 java 中运行一个线程,让我知道另一个线程何时死亡?