java - 如何在我的用例中协调线程

标签 java multithreading scala

我有以下场景:

  1. 线程 A 将向线程 B 和线程 C 发送一些任务。(确切的任务数量未知)

  2. 对于每个任务,线程A会同时(异步)将其发送给B和C,然后如果B或C中的任何一个成功完成任务或者都失败了,A会继续发送下一个任务。这里的想法是尽可能避免阻塞。即,对于同一个任务,当B完成它而C还在处理时,A可以立即发送下一个任务,而不需要等待C得到结果。

  3. 预计 B 和 C 中较慢的一个可以跳过一些任务,只要任务由另一个完成即可。例如,B 可能完成任务 t1 t2 t3 t4,而 C 只完成 t1 t4,因为当 C 收到 t2 和 t3 时,由于某种原因它仍在处理 t1。

是否有适用于此的线程同步结构?我正在检查 java.util.concurrent.Phaser,但它似乎不符合我的需要。欢迎任何意见,提前致谢。

最佳答案

如果您使用 Future 或 actors 而不是线程作为构建 block ,这会更容易。直接在线程之上执行此操作会导致许多问题,因为您必须处理细节,例如对传入消息进行排队。另一个问题是,如果您想解决的问题反射(reflect)了您的要求,我不明白 - 在 2 个不同线程上执行两次相同任务 的值(value)在哪里?只是感觉不对。

这是一个天真的非阻塞实现,用于了解所涉及的内容,但不要在实际代码中这样做(真的要考虑更高级别的抽象):

val queue = new AtomicReference(Queue.empty[Runnable])

def worker() = new Thread(new Runnable {
  @tailrec
  def run() = {
    val currentQueue = queue.get
    if (currentQueue.nonEmpty) {
      val (task, updatedQueue) = currentQueue.dequeue
      try {
        task.run()
      } catch {
        case NonFatal(ex) =>
          ex.printStackTrace()
      }

      // if this fails, then another worker succeeded
      queue.compareAndSet(currentQueue, updatedQueue)
      // process next task in queue
      if (updatedQueue.nonEmpty) run()
    }
  }
})

@tailrec
def submitTask(task: Runnable): Unit =  {
  val currentQueue = queue.get
  val newQueue = currentQueue.enqueue(task)

  if (!queue.compareAndSet(currentQueue, newQueue))
    submitTask(task)
  else if (currentQueue.isEmpty) {
    // because of the CAS above, only 2 workers will be
    // active at the same time
    worker().start()
    worker().start()
  }
}

关于java - 如何在我的用例中协调线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24428376/

相关文章:

java - Resin 的 `pomegranate' - 自动神奇加载项目的 Maven jar 依赖项 - 但如何使用 pom.xml 为它们生成 jar?

java - 将数组拆分为 block 时出现问题

java - 如何停止崩溃的JLabels

scala - 更新 2d 计数表

java - 如何从网站获取非拉丁字符?

java - 具有身份验证的 Spring Boot 项目结构

java - 如何为Java并发应用制定优化场景?

Python cx_oracle 多线程不适用于每个线程的游标

scala - 当scala文件有中文注释字符时如何设置scala编译器

scala - 如何使用反射获取默认构造函数参数?