我有以下场景:
线程 A 将向线程 B 和线程 C 发送一些任务。(确切的任务数量未知)
对于每个任务,线程A会同时(异步)将其发送给B和C,然后如果B或C中的任何一个成功完成任务或者都失败了,A会继续发送下一个任务。这里的想法是尽可能避免阻塞。即,对于同一个任务,当B完成它而C还在处理时,A可以立即发送下一个任务,而不需要等待C得到结果。
预计 B 和 C 中较慢的一个可以跳过一些任务,只要任务由另一个完成即可。例如,B 可能完成任务 t1 t2 t3 t4,而 C 只完成 t1 t4,因为当 C 收到 t2 和 t3 时,由于某种原因它仍在处理 t1。
是否有适用于此的线程同步结构?我正在检查 java.util.concurrent.Phaser
,但它似乎不符合我的需要。欢迎任何意见,提前致谢。
最佳答案
如果您使用 Future
或 actors 而不是线程作为构建 block ,这会更容易。直接在线程之上执行此操作会导致许多问题,因为您必须处理细节,例如对传入消息进行排队。另一个问题是,如果您想解决的问题反射(reflect)了您的要求,我不明白 - 在 2 个不同线程上执行两次相同任务 的值(value)在哪里?只是感觉不对。
这是一个天真的非阻塞实现,用于了解所涉及的内容,但不要在实际代码中这样做(真的要考虑更高级别的抽象):
val queue = new AtomicReference(Queue.empty[Runnable])
def worker() = new Thread(new Runnable {
@tailrec
def run() = {
val currentQueue = queue.get
if (currentQueue.nonEmpty) {
val (task, updatedQueue) = currentQueue.dequeue
try {
task.run()
} catch {
case NonFatal(ex) =>
ex.printStackTrace()
}
// if this fails, then another worker succeeded
queue.compareAndSet(currentQueue, updatedQueue)
// process next task in queue
if (updatedQueue.nonEmpty) run()
}
}
})
@tailrec
def submitTask(task: Runnable): Unit = {
val currentQueue = queue.get
val newQueue = currentQueue.enqueue(task)
if (!queue.compareAndSet(currentQueue, newQueue))
submitTask(task)
else if (currentQueue.isEmpty) {
// because of the CAS above, only 2 workers will be
// active at the same time
worker().start()
worker().start()
}
}
关于java - 如何在我的用例中协调线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24428376/