java - 如何动态更新 RX Observable?

标签 java rx-java reactive-programming rx-kotlin

(使用 RxKotlin 和 RxJava,但为了简单起见使用元代码)

许多 Reactive Extensions 指南都是从现有数据创建 Observable 开始的。来自 The introduction to Reactive Programming you've been missing ,它是从单个字符串创建的

var soureStream= Rx.Observable.just('https://api.github.com/users');

同样,来自 RxKotlin 的首页,来自填充列表

val list = listOf(1,2,3,4,5)
list.toObservable()     

现在考虑一个产生 outStream 的简单过滤器,

var outStream = sourceStream.filter({x > 3})

在这两个指南中,源事件都是先验声明的。这意味着事件的时间线有某种形式

source: ----1,2,3,4,5-------
out:    --------------4,5---

如何修改 sourceStream 使其更像管道?换句话说,在 sourceStream 创建期间没有可用的输入数据?当源事件可用时,它会立即由​​ out 处理:

source: ---1--2--3-4---5-------
out:    ------------4---5-------

我希望找到一个用于动态更新的Observable.add()

var sourceStream = Observable.empty()
var outStream = sourceStream.filter({x>3})

//print each element as its added 
sourceStream .subscribe({println(it)})
outStream.subscribe({println(it)})

for i in range(5):
    sourceStream.add(i)

这可能吗?

最佳答案

I'm new, but how could I solve my problem without a subject? If I'm testing an application, and I want it to "pop" an update every 5 seconds, how else can I do it other than this Publish subscribe business? Can someone post an answer to this question that doesn't involve a Subscriber?

如果您想每五秒弹出一次更新,则使用 interval 运算符创建一个 Observable,不要使用主题。有几十种不同的运算符可用于构建可观察对象,因此您很少需要主题。

也就是说,有时您确实需要一个,并且在测试代码时它们会非常方便。我在单元测试中广泛使用它们。

To Use Subject Or Not To Use Subject?这是一篇关于主题的优秀文章。

关于java - 如何动态更新 RX Observable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43945068/

相关文章:

javascript - 从 glassfish 服务器日志上的 javascript 函数打印 JSON 对象

java - NewString() & NewStringUTF() 显示错误无效修改后的 UTF-8 :

java - 使用java函数将字符 "a"转换为数字0

rx-java - 为什么在 RxJava 中阻塞调用栈被认为是不好的做法?

java - 重用同一个数组而不是声明一个新数组是否有意义?

android - PublishSubject 不适用于firstOrError()

java - 如何在Mono<Map<Entity, Integer>>上调用entrySet方法

android - RecyclerView 使用 RxJava 无限滚动

java - 在缓冲图像上绘图

java - 与 Flux 相交操作 - Project Reactor