我有在共享可重入读写锁的线程池中执行的任务。如果执行完成,这些任务将返回 future 。当锁发生争用时,可重入读写锁将等待一个条件。
我使用的库公开了 wait_for_any
从一组任务中检索一个或多个完成的 future 的方法。然而,即使一个或多个 future 已经完成wait_for_any
在所有 future 完成之前,方法将无法返回。此外,wait_for_any
方法公开一个超时参数,如果设置该参数随后将被忽略。
我的问题是我做错了什么会导致这样的 wait_for_any
阻止的方法?我是否理解 Python 对条件等待和通知的实现不正确,这些构造是否会完全阻塞 Python 中的每个线程?
我使用的库称为 Futurist,由 OpenStack 基金会维护。以下是我使用的相关类和方法的链接: GreenThreadPoolExecutor
和 waiters.wait_for_any
这是 ReentrantReadWriteLock:
class ReentrantReadWriteLock(object):
def __init__(self):
self._read_lock = RLock()
self._write_lock = RLock()
self._condition = Condition
self._num_readers = 0
self._wants_write = False
def read_acquire(self, blocking=True):
int_lock = False
try:
if self._read_lock.acquire(blocking):
int_lock = True
LOG.warning("read internal lock acquired")
while self._wants_write:
LOG.warning("read wants write true")
if not blocking:
LOG.warning("read non blocking")
return False
LOG.warning("read wait")
with self._condition:
self._condition.wait()
first_it = False
LOG.warning("read acquired lock")
self._num_readers += 1
return True
LOG.warning("read internal lock failed")
return False
finally:
if int_lock:
self._read_lock.release()
def write_acquire(self, blocking=True):
int_lock = False
try:
if self._write_lock.acquire(blocking):
int_lock = True
LOG.warning("write internal lock acquired")
while self._num_readers > 0 or self._wants_write:
LOG.warning("write wants write true or num read")
if not blocking:
LOG.warning("write non blocking")
return False
LOG.warning("write wait")
with self._condition:
self._condition.wait()
first_it = False
LOG.warning("write acquired lock")
self._wants_write = True
return True
LOG.warning("write internal lock failed")
return False
finally:
if int_lock:
self._write_lock.release()
为了测试锁并为它无限期地阻塞,我执行以下操作:
def get_read(self, rrwlock):
return rrwlock.read_acquire()
def get_write(self, rrwlock):
return rrwlock.write_acquire()
def test():
self._threadpool = futurist.GreenThreadPoolExecutor(max_workers=4)
rrwlock = ReentrantReadWriteLock()
futures = []
futures.append(self._threadpool.submit(self.get_read, rrwlock))
futures.append(self._threadpool.submit(self.get_write, rrwlock))
# Get the results and verify only one of the calls succeeded
# assert that the other call is still pending
results = waiters.wait_for_any(futures)
self.assertTrue(results[0].pop().result)
self.assertEqual(1, len(results[1]))
在示例中执行
results = waiters.wait_for_any(futures)
无限期地阻止。这让我彻底糊涂了。我希望有人可以向我解释这种行为。更新 2019-10-16 18:55:00 UTC :
主线程的阻塞不限于这个 ReentrantReadWriteLock
实现,但在使用诸如 readerwriterlock 之类的库时也会发生.
更新 2019-10-17 08:15:00 UTC
我已将此作为错误报告提交给启动板上的 futurist 维护者,因为我认为这种行为是不正确的:launchpad bug report
更新 2019-10-20 09:02:00 UTC
从那以后,我观察到 future 主义图书馆进度被阻止的调用:waiter.event.wait(timeout)
一个类似的问题似乎被提交到 Python 3.3 和 3.4 并已被关闭:closed issue
更新 2019-10-21 09:06:00 UTC
已提交 future 主义库的补丁以尝试解决 this issue .
更新 2019-10-22 08:03:00 UTC
提交的补丁没有解决问题。追踪时
waiter.event.wait(timeout)
调用 waiter.acquire() 时 Python threading.py 等待函数中的调用块.更新 2019-10-23 07:17:00 UTC
我创建了一个 small repository这表明使用 native ThreadPoolExecutor 和 future 可以实现这一点。我开始怀疑这是由 GIL 引起的 CPython 中的限制。下面的代码演示了
使用与上图相同的锁进行演示的操作:
from rrwlock import ReentrantReadWriteLock
from concurrent.futures import ThreadPoolExecutor
def read_lock(lock):
lock.read_acquire()
def write_lock(lock):
lock.write_acquire()
def main():
local_lock = ReentrantReadWriteLock()
with ThreadPoolExecutor(max_workers=2) as executor:
# First task will submit fine
future = executor.submit(read_lock, local_lock)
# Second one will block indefinitely
future2 = executor.submit(write_lock, local_lock)
更新 2019-10-31 07:36:00 UTC
可重入读写锁已更新,可用于 Python
2.7 并且与 demo repository on Github 中的内容保持一致.
此外,已经发现 2019-10-23 中描述的 native 线程池演示不起作用,因为与最后一条语句一起
future2 = executor.submit(write_lock, local_lock)
__exit__
线程池的方法将被调用。自然地,这个方法试图干净地关闭所有当前运行的线程,这是由于持有锁而无法实现的。该示例已更新为 spin_for_any 示例:futures = []
futures.append(executor.submit(read_lock, local_lock))
futures.append(executor.submit(write_lock, local_lock))
# This will loop indefinitely as one future will
# never be done but it shouldn't block.
# although similar to waiters.wait_for_any this
# would rather be 'spin_for_any' since it does
# not use wait().
while len(futures) > 0:
for f in futures:
if f.done():
futures.remove(f)
f.result()
print("Future done")
这个原生 Python 并发 spin_for_any 示例完全按预期工作。
最佳答案
在您的 ReentrantReadWriteLock
类,尝试改变
self._condition = Condition()
关于python - 在线程内调用 condition.wait() 会导致检索任何 future 以阻塞主线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58410610/