java - 确定嵌套消费者完成情况

标签 java concurrency parallel-processing

如何同步大量嵌套生产者-消费者的总体消费者完成情况?

例如,假设您有一个“USPS”系统实现:

  1. 运行一个 Backoffice.start 函数来启动一个 Runnable 我的 SubOffice 对象中的 SubOffice.startMailProcessing
  2. 我的全部 SubOffice 对象使用者“启动”函数现在开始处理 邮件
  3. 每个 SubOffice 对象都会向使用者进行自己的扇出 它启动 MailTruck.deliverMailRunnable
  4. 等待以上所有邮件 处理当天邮件和所有卡车投递的流程 邮件
  5. 重复

我可以设想一些解决方案:

  1. 似乎一种方法是关闭执行器并等待所有 这些 Runnable 需要完成,但这并不理想,因为您 必须从头开始为执行程序重新创建这些线程 重复循环。
  2. Java 8 提供了一个构造 CompletableFuture 来解决这个问题。然而,对于严重嵌套的生产者-消费者模式,人们可能需要实现“回调 hell ”来将所有 future 冒泡到初始生产者。
  3. 使用Phaser(即CountDownLatch 类固醇)并用移相器注册所有消费者,以及 完成后取消注册,同时让主线程等待 已经完成了。
  4. 一些基于响应式(Reactive)声明的技术(例如 RxJava)似乎可以处理其中一些类型的情况。

我想象等待完成然后循环,就像上面的例子一样,重复一个循环是一个常见的问题。 此类问题的标准解决方案或架构是什么?

最佳答案

如果您想依赖 Java8 的 CompletableFuture,您可以使用 CompletableFuture.thenCompose() 将嵌套的 future 扁平化为单个 future。 (这相当于在更面向功能的语言中通常所说的平面 map )。

要将 futures 列表合并为一个 future,您可以使用 CompletableFuture.allOf(),它返回一个 future,当所有单独的 future 完成时,该 future 也完成。

结合这两种方法可以通过从所有子任务的各种 future 中组合一个 future 来避免回调 hell 。

关于java - 确定嵌套消费者完成情况,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47142415/

相关文章:

C++ 线程安全队列关闭

perl - 使用 Perl 将进程分配给核心

Java在读取文件但同时使用流时避免java.lang.OutOfMemoryError

java - 如何确保每个 3x3 block 都包含数独中的值

java - Oracle 序列可以为空吗?

swift - Swift 中没有信号量的并发

multithreading - 英特尔 SFENCE 有发布语义吗?

c - 为什么这段使用openmp计算Pi值的代码每次给出的答案(最后几个 float )都略有不同?

java - 在所有字段中查找 URL 的一部分 (elasticsearch)

java - 如何将 ear 应用程序重写为 Spring