基本上,我们有一个生产者一次产生一个随机数,几个消费者休眠 1 秒然后打印一个数字。
每个消费者都是唯一的,每个号码只能有一个接收者。
这种行为类似于 java 中的 JMS 队列或 BlockingQueue。
在akka流中,我可以找到
balance[T] – (1 input, N outputs) given an input element emits to one of its output ports.
但我在 rxjava 中找不到任何内置组件做同样的工作。
Observable 始终向所有观察者广播消息,如 pub-sub
样式。需要queue
样式怎么办
我错过了什么吗?
最佳答案
我认为你的心智模型与 Rx 的构建并不匹配——想想许多小操作的流,而不是大型组件之间的消息。
我建议 a) 一个有上限的线程池 b) 一个围绕它的 RX 调度器然后 c:
databaseSource
.fetchItems()
.flatMap(item ->
Obsevable.just(item)
.observeOn(cappedThreadScheduler)
.map(item -> longRunningOperation(item))
)
哦,你也可以这样做:
databaseSource
.fetchItems()
.flatMap(item ->
Obsevable.just(item)
.observeOn(schedulers.io())
.map(item -> longRunningOperation(item))
, 16
)
最多并行运行 16 个操作。
关于rx-java - 如何在 rxjava 中实现平衡扇出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40690137/