multithreading - 多生产者单消费者惰性任务执行

标签 multithreading

我正在尝试对一个系统进行建模,其中有多个线程生成数据,并且单个线程消耗数据。诀窍是,我不希望专用线程消耗数据,因为所有线程都位于池中。相反,我希望其中一个生产者在有工作时清空队列,并在另一个生产者已经清除队列时让出。

基本思想是有一个工作队列,以及围绕处理的锁。每个生产者将其有效负载推送到队列中,然后尝试进入锁。该尝试是非阻塞的,并且返回 true(已获取锁)或 false(锁由其他人持有)。

如果获取了锁,则该线程将处理队列中的所有数据,直到队列为空(包括处理过程中其他生产者引入的任何新有效负载)。一旦处理完所有工作,线程就会释放锁并退出。

以下是该算法的 C++ 代码:

void Process(ITask *task) {
     // queue is a thread safe implementation of a regular queue
     queue.push(task);

     // crit_sec is some handle to a critical section like object
     // try_scoped_lock uses RAII to attempt to acquire the lock in the constructor
     //                 if the lock was acquired, it will release the lock in the
     //                 destructor
     try_scoped_lock lock(crit_sec);

     // See if this thread won the lottery. Prize is doing all of the dishes
     if (!lock.Acquired())
        return;

     // This thread got the lock, so it needs to do the work
     ITask *currTask;
     while (queue.try_pop(currTask)) {
          ... execute task ...
     }
}

总的来说,这段代码工作得很好,我从未真正目睹过我将在下面描述的行为,但这种实现让我感到不安。按理说,线程退出 while 循环和释放临界区之间会引入竞争条件。

整个算法依赖于这样的假设:如果持有锁,则线程正在为队列提供服务。

我本质上是在寻找两个问题的启示:

  1. 我是否正确,存在所描述的竞赛条件(其他竞赛的奖励)
  2. 是否有一个标准模式来实现这种高性能且不会引入竞争条件的机制?

最佳答案

是的,存在竞争条件。

线程 A 添加一个任务,获取,处理自身,然后从队列请求任务。被拒绝了。

此时线程 B 将一个任务添加到队列中。然后它尝试获取锁,但失败了,因为线程 A 拥有锁。线程 B 退出。

然后线程 A 退出,队列非空,并且没有人处理其上的任务。

这将很难找到,因为该窗口相对较窄。为了使其更容易找到,在 while 循环之后引入“ sleep 10 秒”。在调用代码中,插入一个任务,等待 5 秒,然后插入第二个任务。再过 10 秒,检查两个插入任务都已完成,并且队列上仍然有一个任务需要处理。

解决此问题的一种方法是将 try_pop 更改为 try_pop_or_unlock,并将您的 lock 传递给它。 try_pop_or_unlock 然后自动检查队列是否为空,如果是,则解锁lock并返回 false。

另一种方法是改进线程池。添加一个基于计数信号量的“消耗”任务启动器。

semaphore_bool bTaskActive;
counting_semaphore counter;

when (counter || !bTaskActive)
  if (bTaskActive)
    return
  bTaskActive = true
  --counter
  launch_task( process_one_off_queue, when_done( [&]{ bTaskActive=false ) );

当计数信号量处于事件状态时,或者当已完成的消费任务触发时,如果没有事件的消费任务,它会启动消费任务。

但这只是我的想法。

关于multithreading - 多生产者单消费者惰性任务执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15147259/

相关文章:

安卓线程 : is it necessary to wait for threads to start before "join"ing them?

C# BackgroundWorker 和 ProgressBar 问题

java - ExecutorService 和 CountDownLatch 的线程安全

java - 检测CompletableFuture链中的超时

multithreading - 您如何订阅线程间的状态更改?

Java 生产者消费者 - 为什么不是一个简单的固定线程池?

java - 需要 "synchronized"的代码在没有它的情况下工作正常

java - 使用线程将两个矩阵相乘

c# - 为什么这个线程方法需要 900 毫秒才能完成?

multithreading - Lisp 中的线程同步