对于类似supervisor的项目,我使用threading库来管理 一些子进程。在某些时候,用户可以提示命令发送 进程管理线程的指令。这些命令存储在 在主进程和进程管理线程之间共享的队列对象。 我想我需要互斥量来解决并发问题所以我做了一点 脚本来尝试它,但首先没有互斥锁以确保我得到预期的结果 并发问题。
我期望脚本每秒打印一个困惑的 int 列表:
import threading
import time
def longer(l, mutex=None):
while 1:
last_val = l[-1]
l.append(last_val + 1)
time.sleep(1)
return
dalist = [0]
t = threading.Thread(target=longer, args=(dalist,))
t.daemon = True
t.start()
while 1:
last_val = dalist[-1]
dalist.append(last_val + 1)
print dalist
time.sleep(1)
但实际上它打印了一个很好的列表,如下所示:
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4, 5, 6]
来自 this在另一篇文章中回答我认为它来自线程库,所以我对多处理库做了同样的事情:
import multiprocessing as mp
import time
def longer(l, mutex=None):
while 1:
last_val = l[-1]
l.append(last_val + 1)
time.sleep(1)
return
dalist = [0]
t = mp.Process(target=longer, args=(dalist,))
t.start()
while 1:
last_val = dalist[-1]
dalist.append(last_val + 1)
print dalist
time.sleep(1)
但我得到了相同的结果,有点“慢”:
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
所以我想知道我是否真的需要互斥量来管理类似队列的对象 线程之间共享??? 而且,从上面的代码之一,我怎样才能有效地重现 我搜索的预期并发问题?
感谢阅读
编辑 1: 来自user4815162342的评论我更改了第一个片段,并通过在值检索和列表附加之间的“更长”函数内移动 sleep 调用来设法获得某种竞争条件:
import threading
import time
def longer(l, mutex=None):
while 1:
last_val = l[-1]
time.sleep(1)
l.append(last_val + 1)
return
dalist = [0]
t = threading.Thread(target=longer, args=(dalist,))
t.daemon = True
t.start()
while 1:
last_val = dalist[-1]
dalist.append(last_val + 1)
print dalist
time.sleep(1)
它给我这样的东西:
[0, 1]
[0, 1, 1, 2]
[0, 1, 1, 2, 2, 3]
[0, 1, 1, 2, 2, 3, 3, 4]
并且我设法使用这样的线程锁解决了我的人为问题:
import threading
import time
def longer(l, mutex=None):
while 1:
if mutex is not None:
mutex.acquire()
last_val = l[-1]
time.sleep(1)
l.append(last_val + 1)
if mutex is not None:
mutex.release()
return
dalist = [0]
mutex = threading.Lock()
t = threading.Thread(target=longer, args=(dalist, mutex))
t.daemon = True
t.start()
while 1:
if mutex is not None:
mutex.acquire()
last_val = dalist[-1]
dalist.append(last_val + 1)
if mutex is not None:
mutex.release()
print dalist
time.sleep(1)
然后产生:
[0, 1, 2]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
最佳答案
您的第一个代码片段包含竞争条件并且确实需要互斥体。全局解释器锁使竞争条件很少见,因为一个线程在任何给定时间都在运行。但是,每执行几条字节码指令,当前线程就会放弃全局解释器锁的所有权,让其他线程有机会运行。所以,给定你的代码:
last_val = dalist[-1]
dalist.append(last_val + 1)
如果字节码切换发生在执行第一行之后,另一个线程将选取相同的 last_val
并将其附加到列表中。将控制权交还给初始线程后,存储在 last_val
中的值将第二次附加到列表中。互斥锁会以明显的方式阻止竞争:列表访问和追加之间的上下文切换会将控制权交给另一个线程,但它会立即在互斥锁中被阻塞,并将控制权交还给原始线程。
您的第二个示例仅“有效”,因为这两个进程具有单独的列表实例。修改一个列表不会影响另一个,因此另一个进程也可能没有运行。尽管 multiprocessing
有一个用于 threading
的直接替换 API,但底层概念有很大不同,从一个切换到另一个时需要考虑到这一点。
关于Python 线程/多处理不需要 Mutex?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33435174/