rx-java - 如何在 rxjava 中实现平衡扇出?

标签 rx-java

基本上,我们有一个生产者一次产生一个随机数,几个消费者休眠 1 秒然后打印一个数字。

每个消费者都是唯一的,每个号码只能有一个接收者。
这种行为类似于 java 中的 JMS 队列或 BlockingQueue。

在akka流中,我可以找到

balance[T] – (1 input, N outputs) given an input element emits to one of its output ports.

但我在 rxjava 中找不到任何内置组件做同样的工作。
Observable 始终向所有观察者广播消息,如 pub-sub 样式。需要queue样式怎么办

我错过了什么吗?

最佳答案

我认为你的心智模型与 Rx 的构建并不匹配——想想许多小操作的流,而不是大型组件之间的消息。

我建议 a) 一个有上限的线程池 b) 一个围绕它的 RX 调度器然后 c:

databaseSource
.fetchItems()
.flatMap(item -> 
   Obsevable.just(item)
   .observeOn(cappedThreadScheduler)
   .map(item -> longRunningOperation(item))
)

哦,你也可以这样做:

databaseSource
.fetchItems()
.flatMap(item -> 
   Obsevable.just(item)
   .observeOn(schedulers.io())
   .map(item -> longRunningOperation(item))
   , 16
)

最多并行运行 16 个操作。

关于rx-java - 如何在 rxjava 中实现平衡扇出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40690137/

相关文章:

java - 从 Spring 后台订阅 RxJava Observable

java - 在循环内发出值 (RxJava)

java - RxJava 背压(快速生产者慢消费者)

java - RxAndroid - 来自多个 Observable 的 1 个响应

java - RxJava - 何时将 Observable 与创建方法一起使用

java - 从过滤后的 Observable 返回最终列表

android - 由 : rx. 异常引起。MissingBackpressureException

java - rx-java 中的套接字看门狗

rx-java - RxJava完成并进行测试

java - RxJava 与 Java 8 并行流