python - 在Python中,互斥线程锁定,在互斥/锁释放时删除排队函数?

标签 python multithreading mutex critical-section

这是我遇到的问题:我正在使用Python 2.7,并且我有一个在线程中运行的代码,该线程有一个临界区域,此时只能有一个线程执行。该代码目前没有互斥机制,因此我想询问我可以在我的特定用例中使用什么,其中涉及“删除”“排队”函数。我尝试使用以下最小工作示例来模拟该行为:

useThreading=False # True

if useThreading:  from threading import Thread, Lock
else:             from multiprocessing import Process, Lock

mymutex = Lock()

import time
tstart = None

def processData(data):
  #~ mymutex.acquire()
  try:
    print('thread {0} [{1:.5f}] Do some stuff'.format(data, time.time()-tstart))
    time.sleep(0.5)
    print('thread {0} [{1:.5f}] 1000'.format(data, time.time()-tstart))
    time.sleep(0.5)
    print('thread {0} [{1:.5f}] done'.format(data, time.time()-tstart))
  finally:
    #~ mymutex.release()
    pass

# main:
tstart = time.time()
for ix in xrange(0,3):
  if useThreading: t = Thread(target = processData, args = (ix,))
  else: t = Process(target = processData, args = (ix,))
  t.start()
  time.sleep(0.001)

现在,如果运行此代码,您将得到如下打印输出:

thread 0 [0.00173] Do some stuff
thread 1 [0.00403] Do some stuff
thread 2 [0.00642] Do some stuff
thread 0 [0.50261] 1000
thread 1 [0.50487] 1000
thread 2 [0.50728] 1000
thread 0 [1.00330] done
thread 1 [1.00556] done
thread 2 [1.00793] done

也就是说,三个线程很快就会相继“排队”(大约相隔 2-3 毫秒)。实际上,它们不会排队,它们只是在彼此相隔 2-3 毫秒后开始并行执行。

现在,如果我启用 mymutex.acquire()/.release() 命令,我会得到预期的结果:

thread 0 [0.00174] Do some stuff
thread 0 [0.50263] 1000
thread 0 [1.00327] done
thread 1 [1.00350] Do some stuff
thread 1 [1.50462] 1000
thread 1 [2.00531] done
thread 2 [2.00547] Do some stuff
thread 2 [2.50638] 1000
thread 2 [3.00706] done

基本上,现在有了锁定,线程不会并行运行,但由于锁定,它们会一个接一个地运行 - 只要一个线程正在工作,其他线程就会在 .acquire() 处阻塞。但这也不完全是我想要实现的目标。

我想要实现的是:让我们假设当 .acquire() 第一次被线程函数触发时,它会在队列。之后,其行为与 Lock 基本相同 - 当一个线程工作时,其他线程在 .acquire() 处阻塞。当第一个线程完成时,它会进入finally: block - 在这里,我想检查一下队列中有多少线程正在等待;然后我想删除/删除所有等待线程除了最后一个线程 - 最后,我.release()锁定;这意味着在此之后,接下来将执行队列中最后一个线程。我想,我想编写类似以下伪代码的内容:

  ...
  finally:
    if (len(mymutex.queue) > 2): # more than this instance plus one other waiting:
      while (len(mymutex.queue) > 2):
        mymutex.queue.pop(1) # leave alone [0]=this instance, remove next element
    # at this point, there should be only queue[0]=this instance, and queue[1]= what was the last thread queued previously
    mymutex.release() # once we releace, queue[0] should be gone, and the next in the queue should acquire the mutex/lock..
    pass
  ...

有了这个,我希望打印输出像这样:

thread 0 [0.00174] Do some stuff
thread 0 [0.50263] 1000
thread 0 [1.00327] done
# here upon lock release, thread 1 would be deleted - and the last one in the queue, thread 2, would acquire the lock next:
thread 2 [1.00350] Do some stuff
thread 2 [1.50462] 1000
thread 2 [2.00531] done

在 Python 中实现此目的最直接的方法是什么?

最佳答案

似乎您想要类似队列的行为,那么为什么不使用 Queue

import threading
from  Queue import Queue
import time

# threads advertise to this queue when they're waiting
wait_queue = Queue()
# threads get their task from this queue
task_queue = Queue()

def do_stuff():
    print "%s doing stuff" % str(threading.current_thread())
    time.sleep(5)
def queue_thread(sleep_time):

    # advertise current thread waiting
    time.sleep(sleep_time)  
    wait_queue.put("waiting")  

    # wait for permission to pass
    message = task_queue.get()

    print "%s got task: %s" % (threading.current_thread(), message)
    # unregister current thread waiting
    wait_queue.get()

    if message == "proceed":
        do_stuff()
        # kill size-1 threads waiting
        for _ in range(wait_queue.qsize() - 1):
            task_queue.put("die")
        # release last
        task_queue.put("proceed")

    if message == "die":
        print "%s died without doing stuff" % threading.current_thread()
        pass

t1 = threading.Thread(target=queue_thread, args=(1, ))
t2 = threading.Thread(target=queue_thread, args=(2, ))
t3 = threading.Thread(target=queue_thread, args=(3, ))
t4 = threading.Thread(target=queue_thread, args=(4, ))

# allow first thread to pass
task_queue.put("proceed")

t1.start()
t2.start()
t3.start()
t4.start()

thread-1 首先到达并“获取”该部分,其他线程稍后到达并在队列中等待(并通告它们正在等待)。然后,当线程 1 离开时,它会通过告诉所有其他线程终止并让最后一个线程继续执行来向队列中的最后一个线程授予权限。

您可以使用不同的消息进行更精细的控制,典型的消息是 wait_queue 中的线程 ID(这样您就知道在等待,以及顺序它到达了)。

当您确定需要时,您可以利用对您有利的非阻塞操作( queue.put(block=False)queue.get(block=False) )。

关于python - 在Python中,互斥线程锁定,在互斥/锁释放时删除排队函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28084833/

相关文章:

python - 为什么列表元素替换比 python 中的字符串元素替换慢?

coding-style - 在 Python 中创建常量的约定

java - 设计模式: multiple swing interfaces listen for thread variable

c++ - 解除线程与另一个线程的阻塞

c - pthread_mutex_init() 中的段错误

python - output_notebook() 未定义 - 为什么?

python - 在python中找到特定字符串后如何打印所有行?

arrays - 数组上不确定函数的异步映射

多线程条件变量

Release模式下的 C# 互斥量与 Debug模式下的行为不同