Java Flux 与 Observable/BehaviorSubject

标签 java spring rx-java project-reactor reactive-streams

我的问题是 Flux 是否有能力表现得像 Observable 或 BehaviorSubject。我想我明白了 Flux 的作用和方式,但我看到的每个教程都会创建静态内容的 Flux,即一些预先存在的数字数组,这些数组本质上是有限的。

但是,我希望我的 Flux 成为随时间变化的未知值流……就像 Observable 或 BehaviorSubject。有了这些,您可以创建一个方法,如 setNextValue(String value),并将这些值泵送到 Observable/BehaviorSubject 等的所有订阅者。

使用 Flux 是否可行?还是 Flux 必须先由 Observable 类型的值流组成?

更新

我用下面的实现回答了我自己的问题。接受的答案可能会导致相同的路径,但稍微复杂一些。

最佳答案

every tutorial I see creates a Flux of static content, i.e. some pre-existing array of numbers which are finite in nature.

你会看到这一点,因为大多数教程都关注如何操作和使用 Flux - 但这里的含义(你可以只使用静态的 Flux,固定长度的内容)既不幸又错误。它比这更强大,并且将它与此类静态内容一起使用几乎可以肯定不是您在现实世界中看到的它的使用方式。

基本上有 3 种不同的方法来实例化 Flux 以按照您的描述动态发射元素:

However, I want my Flux to be a stream of unknown values over time... like an Observable or BehaviorSubject. With those, you can create a method like setNextValue(String value), and pump those values to all subscribers of the Observable/BehaviorSubject etc.

当然 - 看看 Flux.push() .这会暴露一个发射器,您可以随时调用 emitter.next(value)。只要您愿意,此流就可以继续(无限期,如果需要的话)。Flux.create()本质上是 Flux.push() 的多线程变体,它也可能有用。

Flux.generate()也可能值得一看 - 这有点像 Flux.push() 的“按需”版本,您只在下游消费者请求时通过回调发出下一个元素,而不是随时随地发射。这并不总是可行的,但如果用例使其可行,则使用此方法是有意义的,因为它考虑了背压,因此可以保证不会因超出其处理能力的请求而使消费者不知所措。

关于Java Flux 与 Observable/BehaviorSubject,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62475651/

相关文章:

java - 如何找到完整显示窗口标题的最小宽度

java - Scala 中的 @remote 注解类型不匹配

java - 我如何在java中选择哪个端口可以免费用于ServerSocket或Socket?

spring - 类型不匹配 : cannot convert from String to ListenableFuture<String>

android - 处理RxJava的onErrorReturn和OnErrorNotImplementedException

android - 在 Subject 的 onNext 上触发 Observable 并分享结果

java - 如何使用 Appium 在移动应用程序中获取 toastr 消息的文本

spring - 不再支持 Spring Roo?

java - Spring 启动 : Change property placeholder signifier

android - RXJava - 一个接一个地运行多个可观察对象(像 concat,但每个可观察对象都有 onCompleted)