(使用 RxKotlin 和 RxJava,但为了简单起见使用元代码)
许多 Reactive Extensions
指南都是从现有数据创建 Observable
开始的。来自 The introduction to Reactive Programming you've been missing ,它是从单个字符串创建的
var soureStream= Rx.Observable.just('https://api.github.com/users');
同样,来自 RxKotlin 的首页,来自填充列表
val list = listOf(1,2,3,4,5)
list.toObservable()
现在考虑一个产生 outStream
的简单过滤器,
var outStream = sourceStream.filter({x > 3})
在这两个指南中,源事件都是先验声明的。这意味着事件的时间线有某种形式
source: ----1,2,3,4,5-------
out: --------------4,5---
如何修改 sourceStream
使其更像管道?换句话说,在 sourceStream
创建期间没有可用的输入数据?当源事件可用时,它会立即由 out 处理:
source: ---1--2--3-4---5-------
out: ------------4---5-------
我希望找到一个用于动态更新的Observable.add()
var sourceStream = Observable.empty()
var outStream = sourceStream.filter({x>3})
//print each element as its added
sourceStream .subscribe({println(it)})
outStream.subscribe({println(it)})
for i in range(5):
sourceStream.add(i)
这可能吗?
最佳答案
I'm new, but how could I solve my problem without a subject? If I'm testing an application, and I want it to "pop" an update every 5 seconds, how else can I do it other than this Publish subscribe business? Can someone post an answer to this question that doesn't involve a Subscriber?
如果您想每五秒弹出一次更新,则使用 interval
运算符创建一个 Observable,不要使用主题。有几十种不同的运算符可用于构建可观察对象,因此您很少需要主题。
也就是说,有时您确实需要一个,并且在测试代码时它们会非常方便。我在单元测试中广泛使用它们。
To Use Subject Or Not To Use Subject?这是一篇关于主题的优秀文章。
关于java - 如何动态更新 RX Observable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43945068/